1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use futures_util::{Sink, Stream};

use crate::StdError;

/// Networking abstraction
///
/// Basically, it's pair of channels: [`Stream`] for receiving messages, and [`Sink`] for sending
/// messages to other parties.
pub trait Delivery<M> {
    /// Outgoing delivery channel
    type Send: Sink<Outgoing<M>, Error = Self::SendError> + Unpin;
    /// Incoming delivery channel
    type Receive: Stream<Item = Result<Incoming<M>, Self::ReceiveError>> + Unpin;
    /// Error of outgoing delivery channel
    type SendError: StdError + Send + Sync + 'static;
    /// Error of incoming delivery channel
    type ReceiveError: StdError + Send + Sync + 'static;
    /// Returns a pair of incoming and outgoing delivery channels
    fn split(self) -> (Self::Receive, Self::Send);
}

impl<M, I, O, IErr, OErr> Delivery<M> for (I, O)
where
    I: Stream<Item = Result<Incoming<M>, IErr>> + Unpin,
    O: Sink<Outgoing<M>, Error = OErr> + Unpin,
    IErr: StdError + Send + Sync + 'static,
    OErr: StdError + Send + Sync + 'static,
{
    type Send = O;
    type Receive = I;
    type SendError = OErr;
    type ReceiveError = IErr;

    fn split(self) -> (Self::Receive, Self::Send) {
        (self.0, self.1)
    }
}

/// Incoming message
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct Incoming<M> {
    /// Index of a message
    pub id: MsgId,
    /// Index of a party who sent the message
    pub sender: PartyIndex,
    /// Indicates whether it's a broadcast message (meaning that this message is received by all the
    /// parties), or p2p (private message sent by `sender`)
    pub msg_type: MessageType,
    /// Received message
    pub msg: M,
}

/// Message type (broadcast or p2p)
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MessageType {
    /// Message was broadcasted
    Broadcast,
    /// P2P message
    P2P,
}

/// Index of party involved in the protocol
pub type PartyIndex = u16;
/// ID of received message
///
/// Can be used to retrieve extra information about message from delivery layer when needed.
/// E.g. if malicious party is detected, we need a proof that received message was sent by this
/// party, so message id should be used to retrieve signature and original message.
pub type MsgId = u64;

impl<M> Incoming<M> {
    /// Maps `Incoming<M>` to `Incoming<T>` by applying a function to the message body
    pub fn map<T, F>(self, f: F) -> Incoming<T>
    where
        F: FnOnce(M) -> T,
    {
        Incoming {
            id: self.id,
            sender: self.sender,
            msg_type: self.msg_type,
            msg: f(self.msg),
        }
    }

    /// Maps `Incoming<M>` to `Result<Incoming<T>, E>` by applying a function `fn(M) -> Result<T, E>`
    /// to the message body
    pub fn try_map<T, E, F>(self, f: F) -> Result<Incoming<T>, E>
    where
        F: FnOnce(M) -> Result<T, E>,
    {
        Ok(Incoming {
            id: self.id,
            sender: self.sender,
            msg_type: self.msg_type,
            msg: f(self.msg)?,
        })
    }

    /// Converts `&Incoming<M>` to `Incoming<&M>`
    pub fn as_ref(&self) -> Incoming<&M> {
        Incoming {
            id: self.id,
            sender: self.sender,
            msg_type: self.msg_type,
            msg: &self.msg,
        }
    }

    /// Checks whether it's broadcast message
    pub fn is_broadcast(&self) -> bool {
        matches!(self.msg_type, MessageType::Broadcast { .. })
    }

    /// Checks whether it's p2p message
    pub fn is_p2p(&self) -> bool {
        matches!(self.msg_type, MessageType::P2P)
    }
}

/// Outgoing message
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Outgoing<M> {
    /// Message destination: either one party (p2p message) or all parties (broadcast message)
    pub recipient: MessageDestination,
    /// Message being sent
    pub msg: M,
}

impl<M> Outgoing<M> {
    /// Constructs an outgoing message addressed to all parties
    pub fn broadcast(msg: M) -> Self {
        Self {
            recipient: MessageDestination::AllParties,
            msg,
        }
    }

    /// Constructs an outgoing message addressed to one party
    pub fn p2p(recipient: PartyIndex, msg: M) -> Self {
        Self {
            recipient: MessageDestination::OneParty(recipient),
            msg,
        }
    }

    /// Maps `Outgoing<M>` to `Outgoing<M2>` by applying a function to the message body
    pub fn map<M2, F>(self, f: F) -> Outgoing<M2>
    where
        F: FnOnce(M) -> M2,
    {
        Outgoing {
            recipient: self.recipient,
            msg: f(self.msg),
        }
    }

    /// Converts `&Outgoing<M>` to `Outgoing<&M>`
    pub fn as_ref(&self) -> Outgoing<&M> {
        Outgoing {
            recipient: self.recipient,
            msg: &self.msg,
        }
    }

    /// Checks whether it's broadcast message
    pub fn is_broadcast(&self) -> bool {
        self.recipient.is_broadcast()
    }

    /// Checks whether it's p2p message
    pub fn is_p2p(&self) -> bool {
        self.recipient.is_p2p()
    }
}

/// Destination of an outgoing message
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MessageDestination {
    /// Broadcast message
    AllParties,
    /// P2P message
    OneParty(PartyIndex),
}

impl MessageDestination {
    /// Returns `true` if it's p2p message
    pub fn is_p2p(&self) -> bool {
        matches!(self, MessageDestination::OneParty(_))
    }
    /// Returns `true` if it's broadcast message
    pub fn is_broadcast(&self) -> bool {
        matches!(self, MessageDestination::AllParties { .. })
    }
}