commonware_p2p/authenticated/discovery/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::{
3 authenticated::{
4 dialing::Dialable,
5 discovery::{
6 actors::{peer, tracker::Metadata},
7 types,
8 },
9 },
10 PeerSetSubscription, TrackedPeers,
11};
12use commonware_actor::{
13 mailbox::{self, Policy},
14 Feedback,
15};
16use commonware_cryptography::PublicKey;
17use commonware_utils::channel::{mpsc, oneshot};
18use std::collections::VecDeque;
19
20/// Messages that can be sent to the tracker actor.
21#[derive(Debug)]
22pub enum Message<C: PublicKey> {
23 // ---------- Used by oracle ----------
24 /// Register a peer set at a given index.
25 Register { index: u64, peers: TrackedPeers<C> },
26
27 // ---------- Used by peer set provider ----------
28 /// Fetch primary and secondary peers for a given ID.
29 PeerSet {
30 /// The index of the peer set to fetch.
31 index: u64,
32 /// One-shot channel to send the tracked peers.
33 responder: oneshot::Sender<Option<TrackedPeers<C>>>,
34 },
35 /// Subscribe to notifications when new peer sets are added.
36 Subscribe {
37 /// One-shot channel to send the subscription receiver.
38 responder: oneshot::Sender<PeerSetSubscription<C>>,
39 },
40
41 // ---------- Used by blocker ----------
42 /// Block a peer, disconnecting them if currently connected and preventing future connections
43 /// for as long as the peer remains in at least one active peer set.
44 Block { public_key: C },
45
46 // ---------- Used by peer ----------
47 /// Notify the tracker that a peer has been successfully connected.
48 ///
49 /// The tracker responds with the greeting info that must be sent to the peer
50 /// before any other messages. If the peer is not eligible, the channel is dropped
51 /// (signaling termination).
52 Connect {
53 /// The public key of the peer.
54 public_key: C,
55
56 /// The mailbox of the peer actor.
57 peer: peer::Mailbox<C>,
58
59 /// `true` if we are the dialer, `false` if we are the listener.
60 dialer: bool,
61
62 /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
63 responder: oneshot::Sender<types::Info<C>>,
64 },
65
66 /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
67 /// keep-alive signal to the peer.
68 ///
69 /// This request is formed on a recurring interval. The tracker sends any response to the
70 /// mailbox stored by the peer's most recent [`Message::Connect`].
71 Construct {
72 /// The public key of the peer.
73 public_key: C,
74 },
75
76 /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
77 ///
78 /// The tracker will construct a [types::Payload::Peers] message in response and send it to the
79 /// mailbox stored by the peer's most recent [`Message::Connect`].
80 BitVec {
81 /// The public key of the peer that sent the bit vector.
82 public_key: C,
83
84 /// The bit vector received.
85 bit_vec: types::BitVec,
86 },
87
88 /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
89 Peers {
90 /// The list of peers received.
91 peers: Vec<types::Info<C>>,
92 },
93
94 // ---------- Used by dialer ----------
95 /// Request a list of dialable peers.
96 Dialable {
97 /// One-shot channel to send the dialable peers and next query deadline.
98 responder: oneshot::Sender<Dialable<C>>,
99 },
100
101 /// Request a reservation for a particular peer to dial.
102 ///
103 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
104 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
105 /// has an active reservation).
106 Dial {
107 /// The public key of the peer to reserve.
108 public_key: C,
109
110 /// sender to respond with the reservation.
111 reservation: oneshot::Sender<Option<Reservation<C>>>,
112 },
113
114 // ---------- Used by listener ----------
115 /// Check if a peer is acceptable (can accept an incoming connection from them).
116 Acceptable {
117 /// The public key of the peer to check.
118 public_key: C,
119
120 /// The sender to respond with whether the peer is acceptable.
121 responder: oneshot::Sender<bool>,
122 },
123
124 /// Request a reservation for a particular peer.
125 ///
126 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
127 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
128 /// has an active reservation).
129 Listen {
130 /// The public key of the peer to reserve.
131 public_key: C,
132
133 /// The sender to respond with the reservation.
134 reservation: oneshot::Sender<Option<Reservation<C>>>,
135 },
136
137 // ---------- Used by reservation ----------
138 /// Release a reservation.
139 Release {
140 /// The metadata of the reservation to release.
141 metadata: Metadata<C>,
142 },
143}
144
145impl<C: PublicKey> Policy for Message<C> {
146 type Overflow = VecDeque<Self>;
147
148 fn handle(overflow: &mut Self::Overflow, message: Self) {
149 overflow.push_back(message);
150 }
151}
152
153/// Mailbox for sending messages to the tracker actor.
154#[derive(Clone, Debug)]
155pub struct Mailbox<C: PublicKey>(mailbox::Sender<Message<C>>);
156
157impl<C: PublicKey> Mailbox<C> {
158 pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
159 Self(sender)
160 }
161
162 /// Send a `Connect` message to the tracker and receive the greeting info.
163 ///
164 /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
165 /// dropped (peer not eligible or tracker shut down).
166 pub(crate) async fn connect(
167 &self,
168 public_key: C,
169 peer: peer::Mailbox<C>,
170 dialer: bool,
171 ) -> Option<types::Info<C>> {
172 let (responder, receiver) = oneshot::channel();
173 let _ = self.0.enqueue(Message::Connect {
174 public_key,
175 peer,
176 dialer,
177 responder,
178 });
179 receiver.await.ok()
180 }
181
182 /// Send a `Construct` message to the tracker.
183 pub(crate) fn construct(&self, public_key: C) -> Feedback {
184 self.0.enqueue(Message::Construct { public_key })
185 }
186
187 /// Send a `BitVec` message to the tracker.
188 pub(crate) fn bit_vec(&self, public_key: C, bit_vec: types::BitVec) -> Feedback {
189 self.0.enqueue(Message::BitVec {
190 public_key,
191 bit_vec,
192 })
193 }
194
195 /// Send a `Peers` message to the tracker.
196 pub(crate) fn peers(&self, peers: Vec<types::Info<C>>) -> Feedback {
197 self.0.enqueue(Message::Peers { peers })
198 }
199
200 /// Request dialable peers from the tracker.
201 ///
202 /// Returns an empty response if the tracker is shut down.
203 pub(crate) async fn dialable(&self) -> Dialable<C> {
204 let (responder, receiver) = oneshot::channel();
205 let _ = self.0.enqueue(Message::Dialable { responder });
206 receiver.await.unwrap_or_default()
207 }
208
209 /// Send a `Dial` message to the tracker.
210 ///
211 /// Returns `None` if the tracker is shut down.
212 pub(crate) async fn dial(&self, public_key: C) -> Option<Reservation<C>> {
213 let (reservation, receiver) = oneshot::channel();
214 let _ = self.0.enqueue(Message::Dial {
215 public_key,
216 reservation,
217 });
218 receiver.await.ok().flatten()
219 }
220
221 /// Send an `Acceptable` message to the tracker.
222 ///
223 /// Returns `false` if the tracker is shut down.
224 pub(crate) async fn acceptable(&self, public_key: C) -> bool {
225 let (responder, receiver) = oneshot::channel();
226 let _ = self.0.enqueue(Message::Acceptable {
227 public_key,
228 responder,
229 });
230 receiver.await.unwrap_or(false)
231 }
232
233 /// Send a `Listen` message to the tracker.
234 ///
235 /// Returns `None` if the tracker is shut down.
236 pub(crate) async fn listen(&self, public_key: C) -> Option<Reservation<C>> {
237 let (reservation, receiver) = oneshot::channel();
238 let _ = self.0.enqueue(Message::Listen {
239 public_key,
240 reservation,
241 });
242 receiver.await.ok().flatten()
243 }
244}
245
246/// Allows releasing reservations
247#[derive(Clone, Debug)]
248pub struct Releaser<C: PublicKey> {
249 sender: mailbox::Sender<Message<C>>,
250}
251
252impl<C: PublicKey> Releaser<C> {
253 /// Create a new releaser.
254 pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
255 Self { sender }
256 }
257
258 /// Release a reservation.
259 pub fn release(&mut self, metadata: Metadata<C>) -> Feedback {
260 self.sender.enqueue(Message::Release { metadata })
261 }
262}
263
264/// Mechanism to register authorized peers.
265///
266/// Peers that are not explicitly authorized
267/// will be blocked by commonware-p2p.
268#[derive(Debug, Clone)]
269pub struct Oracle<C: PublicKey> {
270 sender: mailbox::Sender<Message<C>>,
271}
272
273impl<C: PublicKey> Oracle<C> {
274 pub(super) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
275 Self { sender }
276 }
277}
278
279impl<C: PublicKey> crate::Provider for Oracle<C> {
280 type PublicKey = C;
281
282 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
283 let (responder, receiver) = oneshot::channel();
284 let _ = self.sender.enqueue(Message::PeerSet {
285 index: id,
286 responder,
287 });
288 receiver.await.ok().flatten()
289 }
290
291 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
292 let (responder, receiver) = oneshot::channel();
293 let _ = self.sender.enqueue(Message::Subscribe { responder });
294 receiver.await.unwrap_or_else(|_| {
295 let (_, rx) = mpsc::unbounded_channel();
296 rx
297 })
298 }
299}
300
301impl<C: PublicKey> crate::Manager for Oracle<C> {
302 fn track<R>(&mut self, index: u64, peers: R) -> Feedback
303 where
304 R: Into<TrackedPeers<Self::PublicKey>> + Send,
305 {
306 self.sender.enqueue(Message::Register {
307 index,
308 peers: peers.into(),
309 })
310 }
311}
312
313impl<C: PublicKey> crate::Blocker for Oracle<C> {
314 type PublicKey = C;
315
316 fn block(&mut self, public_key: Self::PublicKey) -> Feedback {
317 self.sender.enqueue(Message::Block { public_key })
318 }
319}