round_based/
delivery.rs

1use futures_util::{Sink, Stream};
2
3/// Networking abstraction
4///
5/// Basically, it's pair of channels: [`Stream`] for receiving messages, and [`Sink`] for sending
6/// messages to other parties.
7pub trait Delivery<M> {
8    /// Outgoing delivery channel
9    type Send: Sink<Outgoing<M>, Error = Self::SendError> + Unpin;
10    /// Incoming delivery channel
11    type Receive: Stream<Item = Result<Incoming<M>, Self::ReceiveError>> + Unpin;
12    /// Error of outgoing delivery channel
13    type SendError: core::error::Error + Send + Sync + 'static;
14    /// Error of incoming delivery channel
15    type ReceiveError: core::error::Error + Send + Sync + 'static;
16    /// Returns a pair of incoming and outgoing delivery channels
17    fn split(self) -> (Self::Receive, Self::Send);
18}
19
20impl<M, I, O, IErr, OErr> Delivery<M> for (I, O)
21where
22    I: Stream<Item = Result<Incoming<M>, IErr>> + Unpin,
23    O: Sink<Outgoing<M>, Error = OErr> + Unpin,
24    IErr: core::error::Error + Send + Sync + 'static,
25    OErr: core::error::Error + Send + Sync + 'static,
26{
27    type Send = O;
28    type Receive = I;
29    type SendError = OErr;
30    type ReceiveError = IErr;
31
32    fn split(self) -> (Self::Receive, Self::Send) {
33        (self.0, self.1)
34    }
35}
36
37/// Incoming message
38#[derive(Debug, Clone, Copy, Eq, PartialEq)]
39pub struct Incoming<M> {
40    /// Index of a message
41    pub id: MsgId,
42    /// Index of a party who sent the message
43    pub sender: PartyIndex,
44    /// Indicates whether it's a broadcast message (meaning that this message is received by all the
45    /// parties), or p2p (private message sent by `sender`)
46    pub msg_type: MessageType,
47    /// Received message
48    pub msg: M,
49}
50
51/// Message type (broadcast or p2p)
52#[derive(Debug, Copy, Clone, Eq, PartialEq)]
53pub enum MessageType {
54    /// Message was broadcasted
55    Broadcast,
56    /// P2P message
57    P2P,
58}
59
60/// Index of party involved in the protocol
61pub type PartyIndex = u16;
62/// ID of received message
63///
64/// Can be used to retrieve extra information about message from delivery layer when needed.
65/// E.g. if malicious party is detected, we need a proof that received message was sent by this
66/// party, so message id should be used to retrieve signature and original message.
67pub type MsgId = u64;
68
69impl<M> Incoming<M> {
70    /// Maps `Incoming<M>` to `Incoming<T>` by applying a function to the message body
71    pub fn map<T, F>(self, f: F) -> Incoming<T>
72    where
73        F: FnOnce(M) -> T,
74    {
75        Incoming {
76            id: self.id,
77            sender: self.sender,
78            msg_type: self.msg_type,
79            msg: f(self.msg),
80        }
81    }
82
83    /// Maps `Incoming<M>` to `Result<Incoming<T>, E>` by applying a function `fn(M) -> Result<T, E>`
84    /// to the message body
85    pub fn try_map<T, E, F>(self, f: F) -> Result<Incoming<T>, E>
86    where
87        F: FnOnce(M) -> Result<T, E>,
88    {
89        Ok(Incoming {
90            id: self.id,
91            sender: self.sender,
92            msg_type: self.msg_type,
93            msg: f(self.msg)?,
94        })
95    }
96
97    /// Converts `&Incoming<M>` to `Incoming<&M>`
98    pub fn as_ref(&self) -> Incoming<&M> {
99        Incoming {
100            id: self.id,
101            sender: self.sender,
102            msg_type: self.msg_type,
103            msg: &self.msg,
104        }
105    }
106
107    /// Checks whether it's broadcast message
108    pub fn is_broadcast(&self) -> bool {
109        matches!(self.msg_type, MessageType::Broadcast { .. })
110    }
111
112    /// Checks whether it's p2p message
113    pub fn is_p2p(&self) -> bool {
114        matches!(self.msg_type, MessageType::P2P)
115    }
116}
117
118/// Outgoing message
119#[derive(Debug, Clone, Copy, PartialEq)]
120pub struct Outgoing<M> {
121    /// Message destination: either one party (p2p message) or all parties (broadcast message)
122    pub recipient: MessageDestination,
123    /// Message being sent
124    pub msg: M,
125}
126
127impl<M> Outgoing<M> {
128    /// Constructs an outgoing message addressed to all parties
129    pub fn broadcast(msg: M) -> Self {
130        Self {
131            recipient: MessageDestination::AllParties,
132            msg,
133        }
134    }
135
136    /// Constructs an outgoing message addressed to one party
137    pub fn p2p(recipient: PartyIndex, msg: M) -> Self {
138        Self {
139            recipient: MessageDestination::OneParty(recipient),
140            msg,
141        }
142    }
143
144    /// Maps `Outgoing<M>` to `Outgoing<M2>` by applying a function to the message body
145    pub fn map<M2, F>(self, f: F) -> Outgoing<M2>
146    where
147        F: FnOnce(M) -> M2,
148    {
149        Outgoing {
150            recipient: self.recipient,
151            msg: f(self.msg),
152        }
153    }
154
155    /// Converts `&Outgoing<M>` to `Outgoing<&M>`
156    pub fn as_ref(&self) -> Outgoing<&M> {
157        Outgoing {
158            recipient: self.recipient,
159            msg: &self.msg,
160        }
161    }
162
163    /// Checks whether it's broadcast message
164    pub fn is_broadcast(&self) -> bool {
165        self.recipient.is_broadcast()
166    }
167
168    /// Checks whether it's p2p message
169    pub fn is_p2p(&self) -> bool {
170        self.recipient.is_p2p()
171    }
172}
173
174/// Destination of an outgoing message
175#[derive(Debug, Copy, Clone, Eq, PartialEq)]
176pub enum MessageDestination {
177    /// Broadcast message
178    AllParties,
179    /// P2P message
180    OneParty(PartyIndex),
181}
182
183impl MessageDestination {
184    /// Returns `true` if it's p2p message
185    pub fn is_p2p(&self) -> bool {
186        matches!(self, MessageDestination::OneParty(_))
187    }
188    /// Returns `true` if it's broadcast message
189    pub fn is_broadcast(&self) -> bool {
190        matches!(self, MessageDestination::AllParties { .. })
191    }
192}