use crate::{
marshal::coding::types::CodedBlock,
types::{coding::Commitment, Round},
CertifiableBlock,
};
use commonware_actor::mailbox::{Overflow, Policy, Sender};
use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{Hasher, PublicKey};
use commonware_utils::channel::oneshot;
use std::collections::VecDeque;
pub(crate) enum Message<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
Proposed {
block: CodedBlock<B, C, H>,
round: Round,
},
Discovered {
commitment: Commitment,
leader: P,
round: Round,
},
Notarized {
commitment: Commitment,
round: Round,
},
GetByCommitment {
commitment: Commitment,
response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
},
GetByDigest {
digest: B::Digest,
response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
},
SubscribeAssignedShardVerified {
commitment: Commitment,
response: oneshot::Sender<()>,
},
SubscribeByCommitment {
commitment: Commitment,
response: oneshot::Sender<CodedBlock<B, C, H>>,
},
SubscribeByDigest {
digest: B::Digest,
response: oneshot::Sender<CodedBlock<B, C, H>>,
},
Prune {
through: Commitment,
},
}
impl<B, C, H, P> Message<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
pub(crate) fn response_closed(&self) -> bool {
match self {
Self::GetByCommitment { response, .. } | Self::GetByDigest { response, .. } => {
response.is_closed()
}
Self::SubscribeAssignedShardVerified { response, .. } => response.is_closed(),
Self::SubscribeByCommitment { response, .. }
| Self::SubscribeByDigest { response, .. } => response.is_closed(),
Self::Proposed { .. }
| Self::Discovered { .. }
| Self::Notarized { .. }
| Self::Prune { .. } => false,
}
}
}
pub(crate) struct Pending<B, C, H, P>(VecDeque<Message<B, C, H, P>>)
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey;
impl<B, C, H, P> Default for Pending<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
fn default() -> Self {
Self(VecDeque::new())
}
}
impl<B, C, H, P> Overflow<Message<B, C, H, P>> for Pending<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn drain<F>(&mut self, mut push: F)
where
F: FnMut(Message<B, C, H, P>) -> Option<Message<B, C, H, P>>,
{
while let Some(message) = self.0.pop_front() {
if message.response_closed() {
continue;
}
if let Some(message) = push(message) {
self.0.push_front(message);
break;
}
}
}
}
impl<B, C, H, P> Policy for Message<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
type Overflow = Pending<B, C, H, P>;
fn handle(overflow: &mut Self::Overflow, message: Self) {
if message.response_closed() {
return;
}
overflow.0.push_back(message);
}
}
#[derive(Clone)]
pub struct Mailbox<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
pub(super) sender: Sender<Message<B, C, H, P>>,
}
impl<B, C, H, P> Mailbox<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
pub(crate) const fn new(sender: Sender<Message<B, C, H, P>>) -> Self {
Self { sender }
}
pub fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
let _ = self.sender.enqueue(Message::Proposed { block, round });
}
pub fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
let _ = self.sender.enqueue(Message::Discovered {
commitment,
leader,
round,
});
}
pub fn notarized(&self, commitment: Commitment, round: Round) {
let _ = self
.sender
.enqueue(Message::Notarized { commitment, round });
}
pub async fn get(&self, commitment: Commitment) -> Option<CodedBlock<B, C, H>> {
let (response, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::GetByCommitment {
commitment,
response,
});
receiver.await.ok().flatten()
}
pub async fn get_by_digest(&self, digest: B::Digest) -> Option<CodedBlock<B, C, H>> {
let (response, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::GetByDigest { digest, response });
receiver.await.ok().flatten()
}
pub fn subscribe_assigned_shard_verified(
&self,
commitment: Commitment,
) -> oneshot::Receiver<()> {
let (responder, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::SubscribeAssignedShardVerified {
commitment,
response: responder,
});
receiver
}
pub fn subscribe(&self, commitment: Commitment) -> oneshot::Receiver<CodedBlock<B, C, H>> {
let (responder, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::SubscribeByCommitment {
commitment,
response: responder,
});
receiver
}
pub fn subscribe_by_digest(&self, digest: B::Digest) -> oneshot::Receiver<CodedBlock<B, C, H>> {
let (responder, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::SubscribeByDigest {
digest,
response: responder,
});
receiver
}
pub fn prune(&self, through: Commitment) {
let _ = self.sender.enqueue(Message::Prune { through });
}
}