1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
use futures_util::{Sink, Stream};
use crate::StdError;
/// Networking abstraction
///
/// Basically, it's pair of channels: [`Stream`] for receiving messages, and [`Sink`] for sending
/// messages to other parties.
pub trait Delivery<M> {
/// Outgoing delivery channel
type Send: Sink<Outgoing<M>, Error = Self::SendError> + Unpin;
/// Incoming delivery channel
type Receive: Stream<Item = Result<Incoming<M>, Self::ReceiveError>> + Unpin;
/// Error of outgoing delivery channel
type SendError: StdError + Send + Sync + 'static;
/// Error of incoming delivery channel
type ReceiveError: StdError + Send + Sync + 'static;
/// Returns a pair of incoming and outgoing delivery channels
fn split(self) -> (Self::Receive, Self::Send);
}
impl<M, I, O, IErr, OErr> Delivery<M> for (I, O)
where
I: Stream<Item = Result<Incoming<M>, IErr>> + Unpin,
O: Sink<Outgoing<M>, Error = OErr> + Unpin,
IErr: StdError + Send + Sync + 'static,
OErr: StdError + Send + Sync + 'static,
{
type Send = O;
type Receive = I;
type SendError = OErr;
type ReceiveError = IErr;
fn split(self) -> (Self::Receive, Self::Send) {
(self.0, self.1)
}
}
/// Incoming message
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct Incoming<M> {
/// Index of a message
pub id: MsgId,
/// Index of a party who sent the message
pub sender: PartyIndex,
/// Indicates whether it's a broadcast message (meaning that this message is received by all the
/// parties), or p2p (private message sent by `sender`)
pub msg_type: MessageType,
/// Received message
pub msg: M,
}
/// Message type (broadcast or p2p)
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MessageType {
/// Message was broadcasted
Broadcast,
/// P2P message
P2P,
}
/// Index of party involved in the protocol
pub type PartyIndex = u16;
/// ID of received message
///
/// Can be used to retrieve extra information about message from delivery layer when needed.
/// E.g. if malicious party is detected, we need a proof that received message was sent by this
/// party, so message id should be used to retrieve signature and original message.
pub type MsgId = u64;
impl<M> Incoming<M> {
/// Maps `Incoming<M>` to `Incoming<T>` by applying a function to the message body
pub fn map<T, F>(self, f: F) -> Incoming<T>
where
F: FnOnce(M) -> T,
{
Incoming {
id: self.id,
sender: self.sender,
msg_type: self.msg_type,
msg: f(self.msg),
}
}
/// Maps `Incoming<M>` to `Result<Incoming<T>, E>` by applying a function `fn(M) -> Result<T, E>`
/// to the message body
pub fn try_map<T, E, F>(self, f: F) -> Result<Incoming<T>, E>
where
F: FnOnce(M) -> Result<T, E>,
{
Ok(Incoming {
id: self.id,
sender: self.sender,
msg_type: self.msg_type,
msg: f(self.msg)?,
})
}
/// Converts `&Incoming<M>` to `Incoming<&M>`
pub fn as_ref(&self) -> Incoming<&M> {
Incoming {
id: self.id,
sender: self.sender,
msg_type: self.msg_type,
msg: &self.msg,
}
}
/// Checks whether it's broadcast message
pub fn is_broadcast(&self) -> bool {
matches!(self.msg_type, MessageType::Broadcast { .. })
}
/// Checks whether it's p2p message
pub fn is_p2p(&self) -> bool {
matches!(self.msg_type, MessageType::P2P)
}
}
/// Outgoing message
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Outgoing<M> {
/// Message destination: either one party (p2p message) or all parties (broadcast message)
pub recipient: MessageDestination,
/// Message being sent
pub msg: M,
}
impl<M> Outgoing<M> {
/// Constructs an outgoing message addressed to all parties
pub fn broadcast(msg: M) -> Self {
Self {
recipient: MessageDestination::AllParties,
msg,
}
}
/// Constructs an outgoing message addressed to one party
pub fn p2p(recipient: PartyIndex, msg: M) -> Self {
Self {
recipient: MessageDestination::OneParty(recipient),
msg,
}
}
/// Maps `Outgoing<M>` to `Outgoing<M2>` by applying a function to the message body
pub fn map<M2, F>(self, f: F) -> Outgoing<M2>
where
F: FnOnce(M) -> M2,
{
Outgoing {
recipient: self.recipient,
msg: f(self.msg),
}
}
/// Converts `&Outgoing<M>` to `Outgoing<&M>`
pub fn as_ref(&self) -> Outgoing<&M> {
Outgoing {
recipient: self.recipient,
msg: &self.msg,
}
}
/// Checks whether it's broadcast message
pub fn is_broadcast(&self) -> bool {
self.recipient.is_broadcast()
}
/// Checks whether it's p2p message
pub fn is_p2p(&self) -> bool {
self.recipient.is_p2p()
}
}
/// Destination of an outgoing message
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MessageDestination {
/// Broadcast message
AllParties,
/// P2P message
OneParty(PartyIndex),
}
impl MessageDestination {
/// Returns `true` if it's p2p message
pub fn is_p2p(&self) -> bool {
matches!(self, MessageDestination::OneParty(_))
}
/// Returns `true` if it's broadcast message
pub fn is_broadcast(&self) -> bool {
matches!(self, MessageDestination::AllParties { .. })
}
}