commonware_p2p/authenticated/discovery/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::authenticated::{
3 discovery::{
4 actors::{peer, tracker::Metadata},
5 types,
6 },
7 mailbox::UnboundedMailbox,
8 Mailbox,
9};
10use commonware_cryptography::PublicKey;
11use commonware_utils::{channels::fallible::FallibleExt, ordered::Set};
12use futures::channel::{mpsc, oneshot};
13
14/// Messages that can be sent to the tracker actor.
15#[derive(Debug)]
16pub enum Message<C: PublicKey> {
17 // ---------- Used by oracle ----------
18 /// Register a peer set at a given index.
19 Register { index: u64, peers: Set<C> },
20
21 // ---------- Used by peer set provider ----------
22 /// Fetch the peer set at a given index.
23 PeerSet {
24 /// The index of the peer set to fetch.
25 index: u64,
26 /// One-shot channel to send the peer set.
27 responder: oneshot::Sender<Option<Set<C>>>,
28 },
29 /// Subscribe to notifications when new peer sets are added.
30 Subscribe {
31 /// One-shot channel to send the subscription receiver.
32 #[allow(clippy::type_complexity)]
33 responder: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Set<C>, Set<C>)>>,
34 },
35
36 // ---------- Used by blocker ----------
37 /// Block a peer, disconnecting them if currently connected and preventing future connections
38 /// for as long as the peer remains in at least one active peer set.
39 Block { public_key: C },
40
41 // ---------- Used by peer ----------
42 /// Notify the tracker that a peer has been successfully connected.
43 ///
44 /// The tracker responds with the greeting info that must be sent to the peer
45 /// before any other messages. If the peer is not eligible, the channel is dropped
46 /// (signaling termination).
47 Connect {
48 /// The public key of the peer.
49 public_key: C,
50
51 /// `true` if we are the dialer, `false` if we are the listener.
52 dialer: bool,
53
54 /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
55 responder: oneshot::Sender<types::Info<C>>,
56 },
57
58 /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
59 /// keep-alive signal to the peer.
60 ///
61 /// This request is formed on a recurring interval.
62 Construct {
63 /// The public key of the peer.
64 public_key: C,
65
66 /// The mailbox of the peer actor.
67 peer: Mailbox<peer::Message<C>>,
68 },
69
70 /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
71 ///
72 /// The tracker will construct a [types::Payload::Peers] message in response.
73 BitVec {
74 /// The bit vector received.
75 bit_vec: types::BitVec,
76
77 /// The mailbox of the peer actor.
78 peer: Mailbox<peer::Message<C>>,
79 },
80
81 /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
82 Peers {
83 /// The list of peers received.
84 peers: Vec<types::Info<C>>,
85 },
86
87 // ---------- Used by dialer ----------
88 /// Request a list of dialable peers.
89 Dialable {
90 /// One-shot channel to send the list of dialable peers.
91 responder: oneshot::Sender<Vec<C>>,
92 },
93
94 /// Request a reservation for a particular peer to dial.
95 ///
96 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
97 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
98 /// has an active reservation).
99 Dial {
100 /// The public key of the peer to reserve.
101 public_key: C,
102
103 /// sender to respond with the reservation.
104 reservation: oneshot::Sender<Option<Reservation<C>>>,
105 },
106
107 // ---------- Used by listener ----------
108 /// Check if a peer is acceptable (can accept an incoming connection from them).
109 Acceptable {
110 /// The public key of the peer to check.
111 public_key: C,
112
113 /// The sender to respond with whether the peer is acceptable.
114 responder: oneshot::Sender<bool>,
115 },
116
117 /// Request a reservation for a particular peer.
118 ///
119 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
120 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
121 /// has an active reservation).
122 Listen {
123 /// The public key of the peer to reserve.
124 public_key: C,
125
126 /// The sender to respond with the reservation.
127 reservation: oneshot::Sender<Option<Reservation<C>>>,
128 },
129
130 // ---------- Used by reservation ----------
131 /// Release a reservation.
132 Release {
133 /// The metadata of the reservation to release.
134 metadata: Metadata<C>,
135 },
136}
137
138impl<C: PublicKey> UnboundedMailbox<Message<C>> {
139 /// Send a `Connect` message to the tracker and receive the greeting info.
140 ///
141 /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
142 /// dropped (peer not eligible or tracker shut down).
143 pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
144 self.0
145 .request(|responder| Message::Connect {
146 public_key,
147 dialer,
148 responder,
149 })
150 .await
151 }
152
153 /// Send a `Construct` message to the tracker.
154 pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
155 self.0.send_lossy(Message::Construct { public_key, peer });
156 }
157
158 /// Send a `BitVec` message to the tracker.
159 pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
160 self.0.send_lossy(Message::BitVec { bit_vec, peer });
161 }
162
163 /// Send a `Peers` message to the tracker.
164 pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
165 self.0.send_lossy(Message::Peers { peers });
166 }
167
168 /// Request a list of dialable peers from the tracker.
169 ///
170 /// Returns an empty list if the tracker is shut down.
171 pub async fn dialable(&mut self) -> Vec<C> {
172 self.0
173 .request_or_default(|responder| Message::Dialable { responder })
174 .await
175 }
176
177 /// Send a `Dial` message to the tracker.
178 ///
179 /// Returns `None` if the tracker is shut down.
180 pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
181 self.0
182 .request(|reservation| Message::Dial {
183 public_key,
184 reservation,
185 })
186 .await
187 .flatten()
188 }
189
190 /// Send an `Acceptable` message to the tracker.
191 ///
192 /// Returns `false` if the tracker is shut down.
193 pub async fn acceptable(&mut self, public_key: C) -> bool {
194 self.0
195 .request_or(
196 |responder| Message::Acceptable {
197 public_key,
198 responder,
199 },
200 false,
201 )
202 .await
203 }
204
205 /// Send a `Listen` message to the tracker.
206 ///
207 /// Returns `None` if the tracker is shut down.
208 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
209 self.0
210 .request(|reservation| Message::Listen {
211 public_key,
212 reservation,
213 })
214 .await
215 .flatten()
216 }
217}
218
219/// Allows releasing reservations
220#[derive(Clone)]
221pub struct Releaser<C: PublicKey> {
222 sender: UnboundedMailbox<Message<C>>,
223}
224
225impl<C: PublicKey> Releaser<C> {
226 /// Create a new releaser.
227 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
228 Self { sender }
229 }
230
231 /// Release a reservation.
232 pub fn release(&mut self, metadata: Metadata<C>) {
233 self.sender.0.send_lossy(Message::Release { metadata });
234 }
235}
236
237/// Mechanism to register authorized peers.
238///
239/// Peers that are not explicitly authorized
240/// will be blocked by commonware-p2p.
241#[derive(Debug, Clone)]
242pub struct Oracle<C: PublicKey> {
243 sender: UnboundedMailbox<Message<C>>,
244}
245
246impl<C: PublicKey> Oracle<C> {
247 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
248 Self { sender }
249 }
250}
251
252impl<C: PublicKey> crate::Manager for Oracle<C> {
253 type PublicKey = C;
254 type Peers = Set<C>;
255
256 /// Register a set of authorized peers at a given index.
257 ///
258 /// These peer sets are used to construct a bit vector (sorted by public key)
259 /// to share knowledge about dialable IPs. If a peer does not yet have an index
260 /// associated with a bit vector, the discovery message will be dropped.
261 ///
262 /// # Parameters
263 ///
264 /// * `index` - Index of the set of authorized peers (like a blockchain height).
265 /// Must be monotonically increasing, per the rules of [Set].
266 /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
267 async fn update(&mut self, index: u64, peers: Self::Peers) {
268 self.sender.0.send_lossy(Message::Register { index, peers });
269 }
270
271 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
272 self.sender
273 .0
274 .request(|responder| Message::PeerSet {
275 index: id,
276 responder,
277 })
278 .await
279 .flatten()
280 }
281
282 async fn subscribe(
283 &mut self,
284 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
285 self.sender
286 .0
287 .request(|responder| Message::Subscribe { responder })
288 .await
289 .unwrap_or_else(|| {
290 let (_, rx) = mpsc::unbounded();
291 rx
292 })
293 }
294}
295
296impl<C: PublicKey> crate::Blocker for Oracle<C> {
297 type PublicKey = C;
298
299 async fn block(&mut self, public_key: Self::PublicKey) {
300 self.sender.0.send_lossy(Message::Block { public_key });
301 }
302}