1use futures_util::{Sink, Stream};
2
3pub trait Delivery<M> {
8 type Send: Sink<Outgoing<M>, Error = Self::SendError> + Unpin;
10 type Receive: Stream<Item = Result<Incoming<M>, Self::ReceiveError>> + Unpin;
12 type SendError: core::error::Error + Send + Sync + 'static;
14 type ReceiveError: core::error::Error + Send + Sync + 'static;
16 fn split(self) -> (Self::Receive, Self::Send);
18}
19
20impl<M, I, O, IErr, OErr> Delivery<M> for (I, O)
21where
22 I: Stream<Item = Result<Incoming<M>, IErr>> + Unpin,
23 O: Sink<Outgoing<M>, Error = OErr> + Unpin,
24 IErr: core::error::Error + Send + Sync + 'static,
25 OErr: core::error::Error + Send + Sync + 'static,
26{
27 type Send = O;
28 type Receive = I;
29 type SendError = OErr;
30 type ReceiveError = IErr;
31
32 fn split(self) -> (Self::Receive, Self::Send) {
33 (self.0, self.1)
34 }
35}
36
37#[derive(Debug, Clone, Copy, Eq, PartialEq)]
39pub struct Incoming<M> {
40 pub id: MsgId,
42 pub sender: PartyIndex,
44 pub msg_type: MessageType,
47 pub msg: M,
49}
50
51#[derive(Debug, Copy, Clone, Eq, PartialEq)]
53pub enum MessageType {
54 Broadcast,
56 P2P,
58}
59
60pub type PartyIndex = u16;
62pub type MsgId = u64;
68
69impl<M> Incoming<M> {
70 pub fn map<T, F>(self, f: F) -> Incoming<T>
72 where
73 F: FnOnce(M) -> T,
74 {
75 Incoming {
76 id: self.id,
77 sender: self.sender,
78 msg_type: self.msg_type,
79 msg: f(self.msg),
80 }
81 }
82
83 pub fn try_map<T, E, F>(self, f: F) -> Result<Incoming<T>, E>
86 where
87 F: FnOnce(M) -> Result<T, E>,
88 {
89 Ok(Incoming {
90 id: self.id,
91 sender: self.sender,
92 msg_type: self.msg_type,
93 msg: f(self.msg)?,
94 })
95 }
96
97 pub fn as_ref(&self) -> Incoming<&M> {
99 Incoming {
100 id: self.id,
101 sender: self.sender,
102 msg_type: self.msg_type,
103 msg: &self.msg,
104 }
105 }
106
107 pub fn is_broadcast(&self) -> bool {
109 matches!(self.msg_type, MessageType::Broadcast { .. })
110 }
111
112 pub fn is_p2p(&self) -> bool {
114 matches!(self.msg_type, MessageType::P2P)
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq)]
120pub struct Outgoing<M> {
121 pub recipient: MessageDestination,
123 pub msg: M,
125}
126
127impl<M> Outgoing<M> {
128 pub fn broadcast(msg: M) -> Self {
130 Self {
131 recipient: MessageDestination::AllParties,
132 msg,
133 }
134 }
135
136 pub fn p2p(recipient: PartyIndex, msg: M) -> Self {
138 Self {
139 recipient: MessageDestination::OneParty(recipient),
140 msg,
141 }
142 }
143
144 pub fn map<M2, F>(self, f: F) -> Outgoing<M2>
146 where
147 F: FnOnce(M) -> M2,
148 {
149 Outgoing {
150 recipient: self.recipient,
151 msg: f(self.msg),
152 }
153 }
154
155 pub fn as_ref(&self) -> Outgoing<&M> {
157 Outgoing {
158 recipient: self.recipient,
159 msg: &self.msg,
160 }
161 }
162
163 pub fn is_broadcast(&self) -> bool {
165 self.recipient.is_broadcast()
166 }
167
168 pub fn is_p2p(&self) -> bool {
170 self.recipient.is_p2p()
171 }
172}
173
174#[derive(Debug, Copy, Clone, Eq, PartialEq)]
176pub enum MessageDestination {
177 AllParties,
179 OneParty(PartyIndex),
181}
182
183impl MessageDestination {
184 pub fn is_p2p(&self) -> bool {
186 matches!(self, MessageDestination::OneParty(_))
187 }
188 pub fn is_broadcast(&self) -> bool {
190 matches!(self, MessageDestination::AllParties { .. })
191 }
192}