1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
use super::Reservation;
use crate::{
authenticated::{
dialing::Dialable,
discovery::{
actors::{peer, tracker::Metadata},
types,
},
mailbox::UnboundedMailbox,
Mailbox,
},
PeerSetSubscription, TrackedPeers,
};
use commonware_cryptography::PublicKey;
use commonware_utils::channel::{fallible::FallibleExt, mpsc, oneshot};
/// Messages that can be sent to the tracker actor.
#[derive(Debug)]
pub enum Message<C: PublicKey> {
// ---------- Used by oracle ----------
/// Register a peer set at a given index.
Register { index: u64, peers: TrackedPeers<C> },
// ---------- Used by peer set provider ----------
/// Fetch primary and secondary peers for a given ID.
PeerSet {
/// The index of the peer set to fetch.
index: u64,
/// One-shot channel to send the tracked peers.
responder: oneshot::Sender<Option<TrackedPeers<C>>>,
},
/// Subscribe to notifications when new peer sets are added.
Subscribe {
/// One-shot channel to send the subscription receiver.
responder: oneshot::Sender<PeerSetSubscription<C>>,
},
// ---------- Used by blocker ----------
/// Block a peer, disconnecting them if currently connected and preventing future connections
/// for as long as the peer remains in at least one active peer set.
Block { public_key: C },
// ---------- Used by peer ----------
/// Notify the tracker that a peer has been successfully connected.
///
/// The tracker responds with the greeting info that must be sent to the peer
/// before any other messages. If the peer is not eligible, the channel is dropped
/// (signaling termination).
Connect {
/// The public key of the peer.
public_key: C,
/// `true` if we are the dialer, `false` if we are the listener.
dialer: bool,
/// One-shot channel to return the greeting info. Dropped if peer is not eligible.
responder: oneshot::Sender<types::Info<C>>,
},
/// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
/// keep-alive signal to the peer.
///
/// This request is formed on a recurring interval.
Construct {
/// The public key of the peer.
public_key: C,
/// The mailbox of the peer actor.
peer: Mailbox<peer::Message<C>>,
},
/// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
///
/// The tracker will construct a [types::Payload::Peers] message in response.
BitVec {
/// The bit vector received.
bit_vec: types::BitVec,
/// The mailbox of the peer actor.
peer: Mailbox<peer::Message<C>>,
},
/// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
Peers {
/// The list of peers received.
peers: Vec<types::Info<C>>,
},
// ---------- Used by dialer ----------
/// Request a list of dialable peers.
Dialable {
/// One-shot channel to send the dialable peers and next query deadline.
responder: oneshot::Sender<Dialable<C>>,
},
/// Request a reservation for a particular peer to dial.
///
/// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
/// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
/// has an active reservation).
Dial {
/// The public key of the peer to reserve.
public_key: C,
/// sender to respond with the reservation.
reservation: oneshot::Sender<Option<Reservation<C>>>,
},
// ---------- Used by listener ----------
/// Check if a peer is acceptable (can accept an incoming connection from them).
Acceptable {
/// The public key of the peer to check.
public_key: C,
/// The sender to respond with whether the peer is acceptable.
responder: oneshot::Sender<bool>,
},
/// Request a reservation for a particular peer.
///
/// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
/// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
/// has an active reservation).
Listen {
/// The public key of the peer to reserve.
public_key: C,
/// The sender to respond with the reservation.
reservation: oneshot::Sender<Option<Reservation<C>>>,
},
// ---------- Used by reservation ----------
/// Release a reservation.
Release {
/// The metadata of the reservation to release.
metadata: Metadata<C>,
},
}
impl<C: PublicKey> UnboundedMailbox<Message<C>> {
/// Send a `Connect` message to the tracker and receive the greeting info.
///
/// Returns `Some(info)` if the peer is eligible, `None` if the channel was
/// dropped (peer not eligible or tracker shut down).
pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
self.0
.request(|responder| Message::Connect {
public_key,
dialer,
responder,
})
.await
}
/// Send a `Construct` message to the tracker.
pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
self.0.send_lossy(Message::Construct { public_key, peer });
}
/// Send a `BitVec` message to the tracker.
pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
self.0.send_lossy(Message::BitVec { bit_vec, peer });
}
/// Send a `Peers` message to the tracker.
pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
self.0.send_lossy(Message::Peers { peers });
}
/// Request dialable peers from the tracker.
///
/// Returns an empty response if the tracker is shut down.
pub async fn dialable(&mut self) -> Dialable<C> {
self.0
.request_or_default(|responder| Message::Dialable { responder })
.await
}
/// Send a `Dial` message to the tracker.
///
/// Returns `None` if the tracker is shut down.
pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
self.0
.request(|reservation| Message::Dial {
public_key,
reservation,
})
.await
.flatten()
}
/// Send an `Acceptable` message to the tracker.
///
/// Returns `false` if the tracker is shut down.
pub async fn acceptable(&mut self, public_key: C) -> bool {
self.0
.request_or(
|responder| Message::Acceptable {
public_key,
responder,
},
false,
)
.await
}
/// Send a `Listen` message to the tracker.
///
/// Returns `None` if the tracker is shut down.
pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
self.0
.request(|reservation| Message::Listen {
public_key,
reservation,
})
.await
.flatten()
}
}
/// Allows releasing reservations
#[derive(Clone, Debug)]
pub struct Releaser<C: PublicKey> {
sender: UnboundedMailbox<Message<C>>,
}
impl<C: PublicKey> Releaser<C> {
/// Create a new releaser.
pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
Self { sender }
}
/// Release a reservation.
pub fn release(&mut self, metadata: Metadata<C>) {
self.sender.0.send_lossy(Message::Release { metadata });
}
}
/// Mechanism to register authorized peers.
///
/// Peers that are not explicitly authorized
/// will be blocked by commonware-p2p.
#[derive(Debug, Clone)]
pub struct Oracle<C: PublicKey> {
sender: UnboundedMailbox<Message<C>>,
}
impl<C: PublicKey> Oracle<C> {
pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
Self { sender }
}
}
impl<C: PublicKey> crate::Provider for Oracle<C> {
type PublicKey = C;
async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
self.sender
.0
.request(|responder| Message::PeerSet {
index: id,
responder,
})
.await
.flatten()
}
async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
self.sender
.0
.request(|responder| Message::Subscribe { responder })
.await
.unwrap_or_else(|| {
let (_, rx) = mpsc::unbounded_channel();
rx
})
}
}
impl<C: PublicKey> crate::Manager for Oracle<C> {
async fn track<R>(&mut self, index: u64, peers: R)
where
R: Into<TrackedPeers<Self::PublicKey>> + Send,
{
self.sender.0.send_lossy(Message::Register {
index,
peers: peers.into(),
});
}
}
impl<C: PublicKey> crate::Blocker for Oracle<C> {
type PublicKey = C;
async fn block(&mut self, public_key: Self::PublicKey) {
self.sender.0.send_lossy(Message::Block { public_key });
}
}