use super::Variant;
use crate::{
marshal::{
ancestry::{AncestorStream, Ancestry, BlockProvider},
Identifier,
},
simplex::types::{Activity, Finalization, Notarization},
types::{Height, Round},
Reporter,
};
use commonware_actor::{
mailbox::{Overflow, Policy, Sender},
Feedback,
};
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_p2p::Recipients;
use commonware_runtime::{telemetry::metrics::histogram::Timed, Clock};
use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
use std::{
collections::{btree_map::Entry, BTreeMap, VecDeque},
sync::Arc,
};
pub(crate) enum Message<S: Scheme, V: Variant> {
GetInfo {
identifier: Identifier<<V::Block as Digestible>::Digest>,
response: oneshot::Sender<Option<(Height, <V::Block as Digestible>::Digest)>>,
},
GetBlock {
identifier: Identifier<<V::Block as Digestible>::Digest>,
response: oneshot::Sender<Option<V::Block>>,
},
GetFinalization {
height: Height,
response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
},
GetProcessedHeight {
response: oneshot::Sender<Option<Height>>,
},
HintFinalized {
height: Height,
targets: NonEmptyVec<S::PublicKey>,
},
SubscribeByDigest {
digest: <V::Block as Digestible>::Digest,
fallback: DigestFallback,
response: oneshot::Sender<V::Block>,
},
SubscribeByCommitment {
commitment: V::Commitment,
fallback: CommitmentFallback,
response: oneshot::Sender<V::Block>,
},
HintNotarized {
round: Round,
commitment: V::Commitment,
},
GetVerified {
round: Round,
response: oneshot::Sender<Option<V::Block>>,
},
Forward {
round: Round,
commitment: V::Commitment,
recipients: Recipients<S::PublicKey>,
},
Proposed {
round: Round,
block: V::Block,
ack: Option<oneshot::Sender<()>>,
},
Verified {
round: Round,
block: V::Block,
ack: Option<oneshot::Sender<()>>,
},
Certified {
round: Round,
block: V::Block,
ack: Option<oneshot::Sender<()>>,
},
SetFloor {
finalization: Finalization<S, V::Commitment>,
},
Prune {
height: Height,
},
Notarization {
notarization: Notarization<S, V::Commitment>,
},
Finalization {
finalization: Finalization<S, V::Commitment>,
},
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DigestFallback {
Wait,
FetchByRound { round: Round },
}
impl From<DigestFallback> for CommitmentFallback {
fn from(fallback: DigestFallback) -> Self {
match fallback {
DigestFallback::Wait => Self::Wait,
DigestFallback::FetchByRound { round } => Self::FetchByRound { round },
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CommitmentFallback {
Wait,
FetchByRound { round: Round },
FetchByCommitment { height: Height },
}
impl<S: Scheme, V: Variant> Message<S, V> {
fn stale(&self, current: Option<Height>) -> bool {
match self {
Self::GetInfo {
identifier: Identifier::Height(height),
..
}
| Self::GetBlock {
identifier: Identifier::Height(height),
..
}
| Self::GetFinalization { height, .. } => Some(*height) < current,
Self::HintFinalized { height, .. } => Some(*height) <= current,
Self::Proposed { .. } | Self::Verified { .. } | Self::Certified { .. } => false,
Self::GetBlock {
identifier: Identifier::Digest(_) | Identifier::Latest,
..
}
| Self::GetInfo {
identifier: Identifier::Digest(_) | Identifier::Latest,
..
}
| Self::GetProcessedHeight { .. } => false,
Self::HintNotarized { .. } => false,
Self::SubscribeByDigest { .. }
| Self::SubscribeByCommitment { .. }
| Self::GetVerified { .. }
| Self::Forward { .. }
| Self::SetFloor { .. }
| Self::Prune { .. }
| Self::Notarization { .. }
| Self::Finalization { .. } => false,
}
}
pub(crate) fn response_closed(&self) -> bool {
match self {
Self::GetInfo { response, .. } => response.is_closed(),
Self::GetBlock { response, .. } | Self::GetVerified { response, .. } => {
response.is_closed()
}
Self::GetFinalization { response, .. } => response.is_closed(),
Self::GetProcessedHeight { response } => response.is_closed(),
Self::SubscribeByDigest { response, .. }
| Self::SubscribeByCommitment { response, .. } => response.is_closed(),
Self::HintNotarized { .. } => false,
Self::HintFinalized { .. }
| Self::Forward { .. }
| Self::Proposed { .. }
| Self::Verified { .. }
| Self::Certified { .. }
| Self::SetFloor { .. }
| Self::Prune { .. }
| Self::Notarization { .. }
| Self::Finalization { .. } => false,
}
}
}
pub(crate) struct Pending<S: Scheme, V: Variant> {
floor: Option<Finalization<S, V::Commitment>>,
prune: Option<Height>,
hints: BTreeMap<Height, NonEmptyVec<S::PublicKey>>,
messages: VecDeque<PendingMessage<S, V>>,
}
enum PendingMessage<S: Scheme, V: Variant> {
Message(Message<S, V>),
HintFinalized(Height),
}
impl<S: Scheme, V: Variant> Default for Pending<S, V> {
fn default() -> Self {
Self {
floor: None,
prune: None,
hints: BTreeMap::new(),
messages: VecDeque::new(),
}
}
}
impl<S: Scheme, V: Variant> Pending<S, V> {
const fn height(&self) -> Option<Height> {
self.prune
}
fn retain(&mut self) {
let current = self.height();
self.hints.retain(|height, _| Some(*height) > current);
let hints = &self.hints;
self.messages.retain(|message| match message {
PendingMessage::Message(message) => {
!message.response_closed() && !message.stale(current)
}
PendingMessage::HintFinalized(height) => hints.contains_key(height),
});
}
fn set_floor(&mut self, finalization: Finalization<S, V::Commitment>) {
let round = finalization.round();
if self
.floor
.as_ref()
.is_some_and(|floor| floor.round() >= round)
{
return;
}
self.floor = Some(finalization);
}
fn prune(&mut self, height: Height) {
let current = self.height();
let prune = Some(height);
if self.prune >= prune {
return;
}
self.prune = self.prune.max(prune);
if self.height() > current {
self.retain();
}
}
fn extend_hint_targets(
pending: &mut NonEmptyVec<S::PublicKey>,
targets: NonEmptyVec<S::PublicKey>,
) {
for target in targets {
if !pending.contains(&target) {
pending.push(target);
}
}
}
fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
let current = self.height();
if current.is_some_and(|current| height <= current) {
return;
}
match self.hints.entry(height) {
Entry::Vacant(entry) => {
entry.insert(targets);
self.messages
.push_back(PendingMessage::HintFinalized(height));
}
Entry::Occupied(mut entry) => {
Self::extend_hint_targets(entry.get_mut(), targets);
}
}
}
fn restore_hint(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
match self.hints.entry(height) {
Entry::Vacant(entry) => {
entry.insert(targets);
}
Entry::Occupied(mut entry) => {
Self::extend_hint_targets(entry.get_mut(), targets);
}
}
self.messages
.push_front(PendingMessage::HintFinalized(height));
}
fn drain_one<F>(&mut self, message: Message<S, V>, push: &mut F) -> bool
where
F: FnMut(Message<S, V>) -> Option<Message<S, V>>,
{
let Some(message) = push(message) else {
return true;
};
match message {
Message::SetFloor { finalization } => self.set_floor(finalization),
Message::Prune { height } => self.prune(height),
Message::HintFinalized { height, targets } => self.restore_hint(height, targets),
message => self.messages.push_front(PendingMessage::Message(message)),
}
false
}
}
impl<S: Scheme, V: Variant> Overflow<Message<S, V>> for Pending<S, V> {
fn is_empty(&self) -> bool {
self.floor.is_none()
&& self.prune.is_none()
&& self.hints.is_empty()
&& self.messages.is_empty()
}
fn drain<F>(&mut self, mut push: F)
where
F: FnMut(Message<S, V>) -> Option<Message<S, V>>,
{
if let Some(finalization) = self.floor.take() {
if !self.drain_one(Message::SetFloor { finalization }, &mut push) {
return;
}
}
if let Some(height) = self.prune.take() {
if !self.drain_one(Message::Prune { height }, &mut push) {
return;
}
}
while let Some(pending) = self.messages.pop_front() {
match pending {
PendingMessage::Message(message) => {
if message.response_closed() {
continue;
}
if !self.drain_one(message, &mut push) {
break;
}
}
PendingMessage::HintFinalized(hint_height) => {
let Some(targets) = self.hints.remove(&hint_height) else {
continue;
};
let message = Message::HintFinalized {
height: hint_height,
targets,
};
if !self.drain_one(message, &mut push) {
break;
}
}
}
}
}
}
impl<S: Scheme, V: Variant> Policy for Message<S, V> {
type Overflow = Pending<S, V>;
fn handle(overflow: &mut Self::Overflow, message: Self) {
if message.response_closed() {
return;
}
match message {
Self::HintFinalized { height, targets } => {
overflow.hint_finalized(height, targets);
}
Self::SetFloor { finalization } => {
overflow.set_floor(finalization);
}
Self::Prune { height } => {
overflow.prune(height);
}
message => {
if message.stale(overflow.height()) {
return;
}
overflow
.messages
.push_back(PendingMessage::Message(message));
}
}
}
}
#[derive(Clone)]
pub struct Mailbox<S: Scheme, V: Variant> {
sender: Sender<Message<S, V>>,
}
impl<S: Scheme, V: Variant> Mailbox<S, V> {
pub(crate) const fn new(sender: Sender<Message<S, V>>) -> Self {
Self { sender }
}
pub(crate) fn ancestor_stream<I, C>(
&self,
clock: Arc<C>,
initial: I,
fetch_duration: Timed,
) -> impl Ancestry<V::ApplicationBlock> + use<S, V, I, C>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
I: IntoIterator<Item = V::Block>,
C: Clock,
{
AncestorStream::new(
clock,
self.clone(),
initial.into_iter().map(V::into_inner),
fetch_duration,
)
}
pub async fn get_info(
&self,
identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
) -> Option<(Height, <V::Block as Digestible>::Digest)> {
let identifier = identifier.into();
let (response, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::GetInfo {
identifier,
response,
});
receiver.await.ok().flatten()
}
pub async fn get_block(
&self,
identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
) -> Option<V::Block> {
let identifier = identifier.into();
let (response, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::GetBlock {
identifier,
response,
});
receiver.await.ok().flatten()
}
pub async fn get_finalization(&self, height: Height) -> Option<Finalization<S, V::Commitment>> {
let (response, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::GetFinalization { height, response });
receiver.await.ok().flatten()
}
pub async fn get_processed_height(&self) -> Option<Height> {
let (response, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::GetProcessedHeight { response });
receiver.await.ok().flatten()
}
pub fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
let _ = self
.sender
.enqueue(Message::HintFinalized { height, targets });
}
pub fn subscribe_by_digest(
&self,
digest: <V::Block as Digestible>::Digest,
fallback: DigestFallback,
) -> oneshot::Receiver<V::Block> {
let (tx, rx) = oneshot::channel();
let _ = self.sender.enqueue(Message::SubscribeByDigest {
digest,
fallback,
response: tx,
});
rx
}
pub fn subscribe_by_commitment(
&self,
commitment: V::Commitment,
fallback: CommitmentFallback,
) -> oneshot::Receiver<V::Block> {
let (tx, rx) = oneshot::channel();
let _ = self.sender.enqueue(Message::SubscribeByCommitment {
fallback,
commitment,
response: tx,
});
rx
}
pub fn hint_notarized(&self, round: Round, commitment: V::Commitment) {
let _ = self
.sender
.enqueue(Message::HintNotarized { round, commitment });
}
pub async fn ancestry<C>(
&self,
clock: Arc<C>,
(fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
fetch_duration: Timed,
) -> Option<impl Ancestry<V::ApplicationBlock> + use<S, V, C>>
where
Self: BlockProvider<Block = V::ApplicationBlock>,
C: Clock,
{
let receiver = self.subscribe_by_digest(start_digest, fallback);
receiver
.await
.ok()
.map(|block| self.ancestor_stream(clock, [block], fetch_duration))
}
pub async fn get_verified(&self, round: Round) -> Option<V::Block> {
let (response, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::GetVerified { round, response });
receiver.await.ok().flatten()
}
#[must_use = "callers must consider block durability before proceeding"]
pub async fn proposed(&self, round: Round, block: V::Block) -> bool {
let (ack, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::Proposed {
round,
block,
ack: Some(ack),
});
receiver.await.is_ok()
}
#[must_use = "callers must consider block durability before proceeding"]
pub async fn verified(&self, round: Round, block: V::Block) -> bool {
let (ack, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::Verified {
round,
block,
ack: Some(ack),
});
receiver.await.is_ok()
}
#[must_use = "callers must consider block durability before proceeding"]
pub async fn certified(&self, round: Round, block: V::Block) -> bool {
let (ack, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::Certified {
round,
block,
ack: Some(ack),
});
receiver.await.is_ok()
}
pub fn set_floor(&self, finalization: Finalization<S, V::Commitment>) {
let _ = self.sender.enqueue(Message::SetFloor { finalization });
}
pub fn prune(&self, height: Height) {
let _ = self.sender.enqueue(Message::Prune { height });
}
pub fn forward(
&self,
round: Round,
commitment: V::Commitment,
recipients: Recipients<S::PublicKey>,
) -> Feedback {
self.sender.enqueue(Message::Forward {
round,
commitment,
recipients,
})
}
}
impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
type Activity = Activity<S, V::Commitment>;
fn report(&mut self, activity: Self::Activity) -> Feedback {
let message = match activity {
Activity::Notarization(notarization) => Message::Notarization { notarization },
Activity::Finalization(finalization) => Message::Finalization { finalization },
_ => return Feedback::Ok,
};
self.sender.enqueue(message)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
marshal::{mocks::harness, standard::Standard},
simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal},
types::{Epoch, View},
Heightable,
};
use commonware_cryptography::{
certificate::mocks::Fixture, ed25519::PrivateKey, Digest as _, Signer as _,
};
use commonware_utils::{channel::oneshot::error::TryRecvError, test_rng_seeded};
type TestMessage = Message<harness::S, Standard<harness::B>>;
type TestPending = Pending<harness::S, Standard<harness::B>>;
fn public_key(seed: u64) -> harness::K {
PrivateKey::from_seed(seed).public_key()
}
fn round(height: u64) -> Round {
Round::new(Epoch::zero(), View::new(height))
}
fn block(height: u64) -> harness::B {
harness::make_raw_block(harness::D::EMPTY, Height::new(height), height)
}
fn commitment(height: u64) -> harness::D {
<Standard<harness::B> as Variant>::commitment(&block(height))
}
fn finalization(height: u64) -> Finalization<harness::S, harness::D> {
let mut rng = test_rng_seeded(height);
let Fixture { schemes, .. } = bls12381_threshold_vrf::fixture::<harness::V, _>(
&mut rng,
harness::NAMESPACE,
harness::NUM_VALIDATORS,
);
let proposal = Proposal::new(round(height), View::zero(), commitment(height));
<harness::StandardHarness as harness::TestHarness>::make_finalization(
proposal,
&schemes,
harness::QUORUM,
)
}
fn get_info(height: u64) -> (TestMessage, oneshot::Receiver<Option<(Height, harness::D)>>) {
let (response, receiver) = oneshot::channel();
(
TestMessage::GetInfo {
identifier: Identifier::Height(Height::new(height)),
response,
},
receiver,
)
}
fn proposed(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
let (ack, receiver) = oneshot::channel();
(
TestMessage::Proposed {
round: round(height),
block: block(height),
ack: Some(ack),
},
receiver,
)
}
fn verified(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
let (ack, receiver) = oneshot::channel();
(
TestMessage::Verified {
round: round(height),
block: block(height),
ack: Some(ack),
},
receiver,
)
}
fn certified(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
let (ack, receiver) = oneshot::channel();
(
TestMessage::Certified {
round: round(height),
block: block(height),
ack: Some(ack),
},
receiver,
)
}
fn get_block(height: u64) -> (TestMessage, oneshot::Receiver<Option<harness::B>>) {
let (response, receiver) = oneshot::channel();
(
TestMessage::GetBlock {
identifier: Identifier::Height(Height::new(height)),
response,
},
receiver,
)
}
fn get_finalization(
height: u64,
) -> (
TestMessage,
oneshot::Receiver<Option<Finalization<harness::S, harness::D>>>,
) {
let (response, receiver) = oneshot::channel();
(
TestMessage::GetFinalization {
height: Height::new(height),
response,
},
receiver,
)
}
fn subscribe_by_digest(height: u64) -> (TestMessage, oneshot::Receiver<harness::B>) {
let (response, receiver) = oneshot::channel();
(
TestMessage::SubscribeByDigest {
digest: block(height).digest(),
fallback: DigestFallback::FetchByRound {
round: round(height),
},
response,
},
receiver,
)
}
fn subscribe_by_commitment_message(
height: u64,
fallback: CommitmentFallback,
) -> (TestMessage, oneshot::Receiver<harness::B>) {
let (response, receiver) = oneshot::channel();
(
TestMessage::SubscribeByCommitment {
commitment: commitment(height),
fallback,
response,
},
receiver,
)
}
fn hint_finalized(height: u64, target: harness::K) -> TestMessage {
TestMessage::HintFinalized {
height: Height::new(height),
targets: NonEmptyVec::new(target),
}
}
fn set_floor(height: u64) -> TestMessage {
TestMessage::SetFloor {
finalization: finalization(height),
}
}
fn prune(height: u64) -> TestMessage {
TestMessage::Prune {
height: Height::new(height),
}
}
fn pending() -> TestPending {
TestPending::default()
}
fn drain(overflow: &mut TestPending) -> VecDeque<TestMessage> {
let mut drained = VecDeque::new();
overflow.drain(|message| {
drained.push_back(message);
None
});
drained
}
fn has_get_info(overflow: &TestPending, height: u64) -> bool {
overflow.messages.iter().any(|message| {
matches!(
message,
PendingMessage::Message(TestMessage::GetInfo {
identifier: Identifier::Height(found),
response,
..
}) if *found == Height::new(height) && !response.is_closed()
)
})
}
fn has_get_block(overflow: &TestPending, height: u64) -> bool {
overflow.messages.iter().any(|message| {
matches!(
message,
PendingMessage::Message(TestMessage::GetBlock {
identifier: Identifier::Height(found),
response,
..
}) if *found == Height::new(height) && !response.is_closed()
)
})
}
fn has_get_finalization(overflow: &TestPending, height: u64) -> bool {
overflow.messages.iter().any(|message| {
matches!(
message,
PendingMessage::Message(TestMessage::GetFinalization {
height: found,
response,
}) if *found == Height::new(height) && !response.is_closed()
)
})
}
fn hint_targets(overflow: &TestPending, height: u64) -> Option<&NonEmptyVec<harness::K>> {
overflow.hints.get(&Height::new(height))
}
fn has_block_message(overflow: &TestPending, height: u64) -> bool {
overflow.messages.iter().any(|message| {
matches!(
message,
PendingMessage::Message(
TestMessage::Proposed { block, .. }
| TestMessage::Verified { block, .. }
| TestMessage::Certified { block, .. }
)
if block.height() == Height::new(height)
)
})
}
fn has_prune(overflow: &TestPending, height: u64) -> bool {
overflow.prune == Some(Height::new(height))
}
fn has_subscription(overflow: &TestPending, height: u64) -> bool {
let expected_digest = block(height).digest();
let expected_commitment = commitment(height);
overflow.messages.iter().any(|message| {
matches!(
message,
PendingMessage::Message(TestMessage::SubscribeByDigest { digest, response, .. })
if *digest == expected_digest && !response.is_closed()
) || matches!(
message,
PendingMessage::Message(TestMessage::SubscribeByCommitment {
commitment,
response,
..
}) if *commitment == expected_commitment && !response.is_closed()
)
})
}
#[test]
fn policy_coalesces_hint_targets() {
let mut overflow = pending();
let first = public_key(1);
let second = public_key(2);
<TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
<TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
<TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, second.clone()));
assert_eq!(overflow.messages.len(), 1);
let targets = hint_targets(&overflow, 10).expect("expected hint");
assert_eq!(targets.len().get(), 2);
assert!(targets.contains(&first));
assert!(targets.contains(&second));
}
#[test]
fn policy_preserves_commitment_subscription_fallbacks() {
let mut overflow = pending();
let (wait, _wait_rx) = subscribe_by_commitment_message(1, CommitmentFallback::Wait);
let (by_round, _by_round_rx) = subscribe_by_commitment_message(
2,
CommitmentFallback::FetchByRound { round: round(2) },
);
let (by_commitment, _by_commitment_rx) = subscribe_by_commitment_message(
3,
CommitmentFallback::FetchByCommitment {
height: Height::new(3),
},
);
<TestMessage as Policy>::handle(&mut overflow, wait);
<TestMessage as Policy>::handle(&mut overflow, by_round);
<TestMessage as Policy>::handle(&mut overflow, by_commitment);
let drained = drain(&mut overflow);
assert_eq!(drained.len(), 3);
assert!(matches!(
&drained[0],
TestMessage::SubscribeByCommitment {
fallback: CommitmentFallback::Wait,
..
}
));
assert!(matches!(
&drained[1],
TestMessage::SubscribeByCommitment {
fallback: CommitmentFallback::FetchByRound { round: found },
..
} if *found == round(2)
));
assert!(matches!(
&drained[2],
TestMessage::SubscribeByCommitment {
fallback: CommitmentFallback::FetchByCommitment { height },
..
} if *height == Height::new(3)
));
}
#[test]
fn policy_handles_closed_subscriptions() {
let mut overflow = pending();
let (pending_closed, pending_closed_rx) = subscribe_by_digest(1);
drop(pending_closed_rx);
overflow
.messages
.push_back(PendingMessage::Message(pending_closed));
let (pending_open, mut pending_open_rx) = subscribe_by_commitment_message(
2,
CommitmentFallback::FetchByRound { round: round(2) },
);
overflow
.messages
.push_back(PendingMessage::Message(pending_open));
let (current_closed, current_closed_rx) = subscribe_by_digest(3);
drop(current_closed_rx);
<TestMessage as Policy>::handle(&mut overflow, current_closed);
assert!(!has_subscription(&overflow, 1));
assert!(has_subscription(&overflow, 2));
assert!(!has_subscription(&overflow, 3));
assert!(matches!(
pending_open_rx.try_recv(),
Err(TryRecvError::Empty)
));
}
#[test]
fn policy_handles_closed_responses() {
let mut overflow = pending();
let (pending_closed, pending_closed_rx) = get_block(1);
drop(pending_closed_rx);
overflow
.messages
.push_back(PendingMessage::Message(pending_closed));
let (pending_open, mut pending_open_rx) = get_info(2);
overflow
.messages
.push_back(PendingMessage::Message(pending_open));
let (current_closed, current_closed_rx) = get_finalization(3);
drop(current_closed_rx);
<TestMessage as Policy>::handle(&mut overflow, current_closed);
assert!(!has_get_block(&overflow, 1));
assert!(has_get_info(&overflow, 2));
assert!(!has_get_finalization(&overflow, 3));
assert!(matches!(
pending_open_rx.try_recv(),
Err(TryRecvError::Empty)
));
}
#[test]
fn policy_drain_stops_after_returned_response_closes() {
let mut overflow = pending();
let (first, first_rx) = get_block(1);
let (second, mut second_rx) = get_info(2);
overflow.messages.push_back(PendingMessage::Message(first));
overflow.messages.push_back(PendingMessage::Message(second));
let mut first_rx = Some(first_rx);
let mut attempts = 0;
overflow.drain(|message| {
attempts += 1;
drop(first_rx.take());
Some(message)
});
assert_eq!(attempts, 1);
let drained = drain(&mut overflow);
assert_eq!(drained.len(), 1);
assert!(matches!(
&drained[0],
TestMessage::GetInfo {
identifier: Identifier::Height(height),
response,
} if *height == Height::new(2) && !response.is_closed()
));
assert!(matches!(second_rx.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn policy_keeps_coalesced_hints_in_fifo_position() {
let mut overflow = pending();
let first = public_key(1);
let second = public_key(2);
let (get_block_9, _get_block_9_rx) = get_block(9);
let (get_info_11, _get_info_11_rx) = get_info(11);
<TestMessage as Policy>::handle(&mut overflow, get_block_9);
<TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
<TestMessage as Policy>::handle(&mut overflow, get_info_11);
<TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, second.clone()));
let drained = drain(&mut overflow);
assert_eq!(drained.len(), 3);
assert!(matches!(
&drained[0],
TestMessage::GetBlock {
identifier: Identifier::Height(height),
..
} if *height == Height::new(9)
));
assert!(matches!(
&drained[2],
TestMessage::GetInfo {
identifier: Identifier::Height(height),
..
} if *height == Height::new(11)
));
let TestMessage::HintFinalized { height, targets } = &drained[1] else {
panic!("expected hint");
};
assert_eq!(*height, Height::new(10));
assert_eq!(targets.len().get(), 2);
assert!(targets.contains(&first));
assert!(targets.contains(&second));
}
#[test]
fn policy_keeps_highest_floor_and_prune() {
let mut overflow = pending();
<TestMessage as Policy>::handle(&mut overflow, set_floor(5));
<TestMessage as Policy>::handle(&mut overflow, set_floor(3));
<TestMessage as Policy>::handle(&mut overflow, set_floor(8));
<TestMessage as Policy>::handle(&mut overflow, prune(4));
<TestMessage as Policy>::handle(&mut overflow, prune(2));
<TestMessage as Policy>::handle(&mut overflow, prune(7));
assert_eq!(
overflow.floor.as_ref().map(Finalization::round),
Some(round(8))
);
assert_eq!(overflow.prune, Some(Height::new(7)));
assert!(overflow.messages.is_empty());
let drained = drain(&mut overflow);
assert_eq!(drained.len(), 2);
assert!(matches!(
&drained[0],
TestMessage::SetFloor { finalization } if finalization.round() == round(8)
));
assert!(matches!(
&drained[1],
TestMessage::Prune { height } if *height == Height::new(7)
));
}
#[test]
fn policy_replaces_floor_and_prune_and_drops_stale_pending_on_drain() {
let mut overflow = pending();
overflow.floor = Some(finalization(5));
let (get_info_4, _get_info_4_rx) = get_info(4);
let (get_block_7, _get_block_7_rx) = get_block(7);
let (get_block_8, _get_block_8_rx) = get_block(8);
overflow
.messages
.push_back(PendingMessage::Message(get_info_4));
overflow
.messages
.push_back(PendingMessage::Message(get_block_7));
overflow.hint_finalized(Height::new(8), NonEmptyVec::new(public_key(1)));
overflow
.messages
.push_back(PendingMessage::Message(get_block_8));
<TestMessage as Policy>::handle(&mut overflow, set_floor(8));
<TestMessage as Policy>::handle(&mut overflow, prune(8));
assert_eq!(
overflow.floor.as_ref().map(Finalization::round),
Some(round(8))
);
assert_eq!(overflow.messages.len(), 1);
assert!(!has_get_info(&overflow, 4));
assert!(!has_get_block(&overflow, 7));
assert!(has_get_block(&overflow, 8));
assert!(hint_targets(&overflow, 8).is_none());
let drained = drain(&mut overflow);
assert_eq!(drained.len(), 3);
assert!(matches!(
&drained[0],
TestMessage::SetFloor { finalization } if finalization.round() == round(8)
));
assert!(matches!(
&drained[1],
TestMessage::Prune { height } if *height == Height::new(8)
));
assert!(matches!(
&drained[2],
TestMessage::GetBlock {
identifier: Identifier::Height(height),
..
} if *height == Height::new(8)
));
let mut overflow = pending();
overflow.prune = Some(Height::new(5));
let (get_finalization_4, _get_finalization_4_rx) = get_finalization(4);
let (get_block_6, _get_block_6_rx) = get_block(6);
let (get_block_7, _get_block_7_rx) = get_block(7);
overflow
.messages
.push_back(PendingMessage::Message(get_finalization_4));
overflow
.messages
.push_back(PendingMessage::Message(get_block_6));
overflow.hint_finalized(Height::new(6), NonEmptyVec::new(public_key(2)));
overflow
.messages
.push_back(PendingMessage::Message(get_block_7));
<TestMessage as Policy>::handle(&mut overflow, prune(7));
assert_eq!(overflow.prune, Some(Height::new(7)));
assert_eq!(overflow.messages.len(), 1);
assert!(!has_get_finalization(&overflow, 4));
assert!(!has_get_block(&overflow, 6));
assert!(has_get_block(&overflow, 7));
assert!(hint_targets(&overflow, 6).is_none());
let drained = drain(&mut overflow);
assert_eq!(drained.len(), 2);
assert!(matches!(
&drained[0],
TestMessage::Prune { height } if *height == Height::new(7)
));
assert!(matches!(
&drained[1],
TestMessage::GetBlock {
identifier: Identifier::Height(height),
..
} if *height == Height::new(7)
));
}
#[test]
fn policy_prune_drops_closed_pending() {
let mut overflow = pending();
let (closed_message, closed_rx) = get_block(8);
drop(closed_rx);
let (open_message, mut open_rx) = get_block(8);
overflow
.messages
.push_back(PendingMessage::Message(closed_message));
overflow
.messages
.push_back(PendingMessage::Message(open_message));
<TestMessage as Policy>::handle(&mut overflow, prune(7));
assert_eq!(overflow.messages.len(), 1);
assert!(has_get_block(&overflow, 8));
assert!(matches!(open_rx.try_recv(), Err(TryRecvError::Empty)));
let mut overflow = pending();
let (closed_message, closed_rx) = get_finalization(8);
drop(closed_rx);
let (open_message, mut open_rx) = get_finalization(8);
overflow
.messages
.push_back(PendingMessage::Message(closed_message));
overflow
.messages
.push_back(PendingMessage::Message(open_message));
<TestMessage as Policy>::handle(&mut overflow, prune(7));
assert_eq!(overflow.messages.len(), 1);
assert!(has_get_finalization(&overflow, 8));
assert!(matches!(open_rx.try_recv(), Err(TryRecvError::Empty)));
}
#[test]
fn policy_skips_retain_when_prune_height_does_not_increase() {
let mut overflow = pending();
<TestMessage as Policy>::handle(&mut overflow, prune(10));
let (closed_message, closed_rx) = get_block(11);
drop(closed_rx);
overflow
.messages
.push_back(PendingMessage::Message(closed_message));
<TestMessage as Policy>::handle(&mut overflow, set_floor(9));
assert_eq!(overflow.messages.len(), 1);
<TestMessage as Policy>::handle(&mut overflow, prune(9));
assert_eq!(overflow.messages.len(), 1);
<TestMessage as Policy>::handle(&mut overflow, prune(12));
assert!(overflow.messages.is_empty());
}
#[test]
fn policy_drops_stale_requests_against_pending_floor_and_prune() {
let mut overflow = pending();
let (get_info_4, _get_info_4_rx) = get_info(4);
let (get_info_5, _get_info_5_rx) = get_info(5);
let (get_info_6, _get_info_6_rx) = get_info(6);
let (get_info_7, _get_info_7_rx) = get_info(7);
let (get_block_4, _get_block_4_rx) = get_block(4);
let (get_block_5, _get_block_5_rx) = get_block(5);
let (get_block_6, _get_block_6_rx) = get_block(6);
let (get_block_7, _get_block_7_rx) = get_block(7);
let (get_finalization_4, _get_finalization_4_rx) = get_finalization(4);
let (get_finalization_6, _get_finalization_6_rx) = get_finalization(6);
<TestMessage as Policy>::handle(&mut overflow, set_floor(5));
<TestMessage as Policy>::handle(&mut overflow, get_info_4);
<TestMessage as Policy>::handle(&mut overflow, get_info_5);
<TestMessage as Policy>::handle(&mut overflow, get_block_4);
<TestMessage as Policy>::handle(&mut overflow, get_block_5);
<TestMessage as Policy>::handle(&mut overflow, get_finalization_4);
<TestMessage as Policy>::handle(&mut overflow, hint_finalized(5, public_key(1)));
<TestMessage as Policy>::handle(&mut overflow, hint_finalized(6, public_key(2)));
<TestMessage as Policy>::handle(&mut overflow, prune(7));
assert!(has_prune(&overflow, 7));
<TestMessage as Policy>::handle(&mut overflow, get_info_6);
<TestMessage as Policy>::handle(&mut overflow, get_finalization_6);
assert!(!has_get_finalization(&overflow, 6));
<TestMessage as Policy>::handle(&mut overflow, get_block_6);
<TestMessage as Policy>::handle(&mut overflow, get_info_7);
assert!(has_get_info(&overflow, 7));
<TestMessage as Policy>::handle(&mut overflow, get_block_7);
assert!(has_get_block(&overflow, 7));
let drained = drain(&mut overflow);
assert_eq!(drained.len(), 4);
assert!(matches!(
&drained[0],
TestMessage::SetFloor { finalization } if finalization.round() == round(5)
));
assert!(matches!(
&drained[1],
TestMessage::Prune { height } if *height == Height::new(7)
));
assert!(matches!(
&drained[2],
TestMessage::GetInfo {
identifier: Identifier::Height(height),
..
} if *height == Height::new(7)
));
assert!(matches!(
&drained[3],
TestMessage::GetBlock {
identifier: Identifier::Height(height),
..
} if *height == Height::new(7)
));
}
#[test]
fn policy_keeps_block_messages_and_waiters() {
let mut overflow = pending();
let (proposed_message, mut proposed_ack) = proposed(4);
let (verified_message, mut verified_ack) = verified(6);
let (certified_message, mut certified_ack) = certified(8);
overflow
.messages
.push_back(PendingMessage::Message(proposed_message));
overflow
.messages
.push_back(PendingMessage::Message(verified_message));
overflow
.messages
.push_back(PendingMessage::Message(certified_message));
<TestMessage as Policy>::handle(&mut overflow, set_floor(7));
assert!(has_block_message(&overflow, 4));
assert!(has_block_message(&overflow, 6));
assert!(has_block_message(&overflow, 8));
assert!(matches!(proposed_ack.try_recv(), Err(TryRecvError::Empty)));
assert!(matches!(verified_ack.try_recv(), Err(TryRecvError::Empty)));
assert!(matches!(certified_ack.try_recv(), Err(TryRecvError::Empty)));
<TestMessage as Policy>::handle(&mut overflow, prune(9));
assert!(has_block_message(&overflow, 8));
assert!(matches!(certified_ack.try_recv(), Err(TryRecvError::Empty)));
let (stale, mut stale_ack) = proposed(8);
<TestMessage as Policy>::handle(&mut overflow, stale);
assert!(has_block_message(&overflow, 8));
assert!(matches!(stale_ack.try_recv(), Err(TryRecvError::Empty)));
let (current, mut current_ack) = verified(9);
<TestMessage as Policy>::handle(&mut overflow, current);
assert!(has_block_message(&overflow, 9));
assert!(matches!(current_ack.try_recv(), Err(TryRecvError::Empty)));
let drained = drain(&mut overflow);
assert!(matches!(drained[0], TestMessage::SetFloor { .. }));
assert!(matches!(drained[1], TestMessage::Prune { .. }));
}
}