use crate::{
marshal::coding::types::CodedBlock,
types::{coding::Commitment, Round},
CertifiableBlock,
};
use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{Hasher, PublicKey};
use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
use std::sync::Arc;
pub enum Message<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
Proposed {
block: CodedBlock<B, C, H>,
round: Round,
},
Discovered {
commitment: Commitment,
leader: P,
round: Round,
},
GetByCommitment {
commitment: Commitment,
response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
},
GetByDigest {
digest: B::Digest,
response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
},
SubscribeAssignedShardVerified {
commitment: Commitment,
response: oneshot::Sender<()>,
},
SubscribeByCommitment {
commitment: Commitment,
response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
},
SubscribeByDigest {
digest: B::Digest,
response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
},
Prune {
through: Commitment,
},
}
#[derive(Clone)]
pub struct Mailbox<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
pub(super) sender: mpsc::Sender<Message<B, C, H, P>>,
}
impl<B, C, H, P> Mailbox<B, C, H, P>
where
B: CertifiableBlock,
C: CodingScheme,
H: Hasher,
P: PublicKey,
{
pub const fn new(sender: mpsc::Sender<Message<B, C, H, P>>) -> Self {
Self { sender }
}
pub async fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
let msg = Message::Proposed { block, round };
self.sender.send_lossy(msg).await;
}
pub async fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
let msg = Message::Discovered {
commitment,
leader,
round,
};
self.sender.send_lossy(msg).await;
}
pub async fn get(&self, commitment: Commitment) -> Option<Arc<CodedBlock<B, C, H>>> {
self.sender
.request(|tx| Message::GetByCommitment {
commitment,
response: tx,
})
.await
.flatten()
}
pub async fn get_by_digest(&self, digest: B::Digest) -> Option<Arc<CodedBlock<B, C, H>>> {
self.sender
.request(|tx| Message::GetByDigest {
digest,
response: tx,
})
.await
.flatten()
}
pub async fn subscribe_assigned_shard_verified(
&self,
commitment: Commitment,
) -> oneshot::Receiver<()> {
let (responder, receiver) = oneshot::channel();
let msg = Message::SubscribeAssignedShardVerified {
commitment,
response: responder,
};
self.sender.send_lossy(msg).await;
receiver
}
pub async fn subscribe(
&self,
commitment: Commitment,
) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
let (responder, receiver) = oneshot::channel();
let msg = Message::SubscribeByCommitment {
commitment,
response: responder,
};
self.sender.send_lossy(msg).await;
receiver
}
pub async fn subscribe_by_digest(
&self,
digest: B::Digest,
) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
let (responder, receiver) = oneshot::channel();
let msg = Message::SubscribeByDigest {
digest,
response: responder,
};
self.sender.send_lossy(msg).await;
receiver
}
pub async fn prune(&self, through: Commitment) {
let msg = Message::Prune { through };
self.sender.send_lossy(msg).await;
}
}