use futures_util::{Sink, Stream};
pub trait Delivery<M> {
type Send: Sink<Outgoing<M>, Error = Self::SendError> + Unpin;
type Receive: Stream<Item = Result<Incoming<M>, Self::ReceiveError>> + Unpin;
type SendError: core::error::Error + Send + Sync + 'static;
type ReceiveError: core::error::Error + Send + Sync + 'static;
fn split(self) -> (Self::Receive, Self::Send);
}
impl<M, I, O, IErr, OErr> Delivery<M> for (I, O)
where
I: Stream<Item = Result<Incoming<M>, IErr>> + Unpin,
O: Sink<Outgoing<M>, Error = OErr> + Unpin,
IErr: core::error::Error + Send + Sync + 'static,
OErr: core::error::Error + Send + Sync + 'static,
{
type Send = O;
type Receive = I;
type SendError = OErr;
type ReceiveError = IErr;
fn split(self) -> (Self::Receive, Self::Send) {
(self.0, self.1)
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct Incoming<M> {
pub id: MsgId,
pub sender: PartyIndex,
pub msg_type: MessageType,
pub msg: M,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MessageType {
Broadcast,
P2P,
}
pub type PartyIndex = u16;
pub type MsgId = u64;
impl<M> Incoming<M> {
pub fn map<T, F>(self, f: F) -> Incoming<T>
where
F: FnOnce(M) -> T,
{
Incoming {
id: self.id,
sender: self.sender,
msg_type: self.msg_type,
msg: f(self.msg),
}
}
pub fn try_map<T, E, F>(self, f: F) -> Result<Incoming<T>, E>
where
F: FnOnce(M) -> Result<T, E>,
{
Ok(Incoming {
id: self.id,
sender: self.sender,
msg_type: self.msg_type,
msg: f(self.msg)?,
})
}
pub fn as_ref(&self) -> Incoming<&M> {
Incoming {
id: self.id,
sender: self.sender,
msg_type: self.msg_type,
msg: &self.msg,
}
}
pub fn is_broadcast(&self) -> bool {
matches!(self.msg_type, MessageType::Broadcast { .. })
}
pub fn is_p2p(&self) -> bool {
matches!(self.msg_type, MessageType::P2P)
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Outgoing<M> {
pub recipient: MessageDestination,
pub msg: M,
}
impl<M> Outgoing<M> {
pub fn broadcast(msg: M) -> Self {
Self {
recipient: MessageDestination::AllParties,
msg,
}
}
pub fn p2p(recipient: PartyIndex, msg: M) -> Self {
Self {
recipient: MessageDestination::OneParty(recipient),
msg,
}
}
pub fn map<M2, F>(self, f: F) -> Outgoing<M2>
where
F: FnOnce(M) -> M2,
{
Outgoing {
recipient: self.recipient,
msg: f(self.msg),
}
}
pub fn as_ref(&self) -> Outgoing<&M> {
Outgoing {
recipient: self.recipient,
msg: &self.msg,
}
}
pub fn is_broadcast(&self) -> bool {
self.recipient.is_broadcast()
}
pub fn is_p2p(&self) -> bool {
self.recipient.is_p2p()
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MessageDestination {
AllParties,
OneParty(PartyIndex),
}
impl MessageDestination {
pub fn is_p2p(&self) -> bool {
matches!(self, MessageDestination::OneParty(_))
}
pub fn is_broadcast(&self) -> bool {
matches!(self, MessageDestination::AllParties { .. })
}
}