commonware_p2p/authenticated/discovery/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::{
3 authenticated::{
4 discovery::{
5 actors::{peer, tracker::Metadata},
6 types,
7 },
8 mailbox::UnboundedMailbox,
9 Mailbox,
10 },
11 PeerSetSubscription,
12};
13use commonware_cryptography::PublicKey;
14use commonware_utils::{
15 channel::{fallible::FallibleExt, mpsc, oneshot},
16 ordered::Set,
17};
18
19/// Messages that can be sent to the tracker actor.
20#[derive(Debug)]
21pub enum Message<C: PublicKey> {
22 // ---------- Used by oracle ----------
23 /// Register a peer set at a given index.
24 Register { index: u64, peers: Set<C> },
25
26 // ---------- Used by peer set provider ----------
27 /// Fetch the peer set at a given index.
28 PeerSet {
29 /// The index of the peer set to fetch.
30 index: u64,
31 /// One-shot channel to send the peer set.
32 responder: oneshot::Sender<Option<Set<C>>>,
33 },
34 /// Subscribe to notifications when new peer sets are added.
35 Subscribe {
36 /// One-shot channel to send the subscription receiver.
37 responder: oneshot::Sender<PeerSetSubscription<C>>,
38 },
39
40 // ---------- Used by blocker ----------
41 /// Block a peer, disconnecting them if currently connected and preventing future connections
42 /// for as long as the peer remains in at least one active peer set.
43 Block { public_key: C },
44
45 // ---------- Used by peer ----------
46 /// Notify the tracker that a peer has been successfully connected.
47 ///
48 /// The tracker responds with the greeting info that must be sent to the peer
49 /// before any other messages. If the peer is not eligible, the channel is dropped
50 /// (signaling termination).
51 Connect {
52 /// The public key of the peer.
53 public_key: C,
54
55 /// `true` if we are the dialer, `false` if we are the listener.
56 dialer: bool,
57
58 /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
59 responder: oneshot::Sender<types::Info<C>>,
60 },
61
62 /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
63 /// keep-alive signal to the peer.
64 ///
65 /// This request is formed on a recurring interval.
66 Construct {
67 /// The public key of the peer.
68 public_key: C,
69
70 /// The mailbox of the peer actor.
71 peer: Mailbox<peer::Message<C>>,
72 },
73
74 /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
75 ///
76 /// The tracker will construct a [types::Payload::Peers] message in response.
77 BitVec {
78 /// The bit vector received.
79 bit_vec: types::BitVec,
80
81 /// The mailbox of the peer actor.
82 peer: Mailbox<peer::Message<C>>,
83 },
84
85 /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
86 Peers {
87 /// The list of peers received.
88 peers: Vec<types::Info<C>>,
89 },
90
91 // ---------- Used by dialer ----------
92 /// Request a list of dialable peers.
93 Dialable {
94 /// One-shot channel to send the list of dialable peers.
95 responder: oneshot::Sender<Vec<C>>,
96 },
97
98 /// Request a reservation for a particular peer to dial.
99 ///
100 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
101 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
102 /// has an active reservation).
103 Dial {
104 /// The public key of the peer to reserve.
105 public_key: C,
106
107 /// sender to respond with the reservation.
108 reservation: oneshot::Sender<Option<Reservation<C>>>,
109 },
110
111 // ---------- Used by listener ----------
112 /// Check if a peer is acceptable (can accept an incoming connection from them).
113 Acceptable {
114 /// The public key of the peer to check.
115 public_key: C,
116
117 /// The sender to respond with whether the peer is acceptable.
118 responder: oneshot::Sender<bool>,
119 },
120
121 /// Request a reservation for a particular peer.
122 ///
123 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
124 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
125 /// has an active reservation).
126 Listen {
127 /// The public key of the peer to reserve.
128 public_key: C,
129
130 /// The sender to respond with the reservation.
131 reservation: oneshot::Sender<Option<Reservation<C>>>,
132 },
133
134 // ---------- Used by reservation ----------
135 /// Release a reservation.
136 Release {
137 /// The metadata of the reservation to release.
138 metadata: Metadata<C>,
139 },
140}
141
142impl<C: PublicKey> UnboundedMailbox<Message<C>> {
143 /// Send a `Connect` message to the tracker and receive the greeting info.
144 ///
145 /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
146 /// dropped (peer not eligible or tracker shut down).
147 pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
148 self.0
149 .request(|responder| Message::Connect {
150 public_key,
151 dialer,
152 responder,
153 })
154 .await
155 }
156
157 /// Send a `Construct` message to the tracker.
158 pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
159 self.0.send_lossy(Message::Construct { public_key, peer });
160 }
161
162 /// Send a `BitVec` message to the tracker.
163 pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
164 self.0.send_lossy(Message::BitVec { bit_vec, peer });
165 }
166
167 /// Send a `Peers` message to the tracker.
168 pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
169 self.0.send_lossy(Message::Peers { peers });
170 }
171
172 /// Request a list of dialable peers from the tracker.
173 ///
174 /// Returns an empty list if the tracker is shut down.
175 pub async fn dialable(&mut self) -> Vec<C> {
176 self.0
177 .request_or_default(|responder| Message::Dialable { responder })
178 .await
179 }
180
181 /// Send a `Dial` message to the tracker.
182 ///
183 /// Returns `None` if the tracker is shut down.
184 pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
185 self.0
186 .request(|reservation| Message::Dial {
187 public_key,
188 reservation,
189 })
190 .await
191 .flatten()
192 }
193
194 /// Send an `Acceptable` message to the tracker.
195 ///
196 /// Returns `false` if the tracker is shut down.
197 pub async fn acceptable(&mut self, public_key: C) -> bool {
198 self.0
199 .request_or(
200 |responder| Message::Acceptable {
201 public_key,
202 responder,
203 },
204 false,
205 )
206 .await
207 }
208
209 /// Send a `Listen` message to the tracker.
210 ///
211 /// Returns `None` if the tracker is shut down.
212 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
213 self.0
214 .request(|reservation| Message::Listen {
215 public_key,
216 reservation,
217 })
218 .await
219 .flatten()
220 }
221}
222
223/// Allows releasing reservations
224#[derive(Clone, Debug)]
225pub struct Releaser<C: PublicKey> {
226 sender: UnboundedMailbox<Message<C>>,
227}
228
229impl<C: PublicKey> Releaser<C> {
230 /// Create a new releaser.
231 pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
232 Self { sender }
233 }
234
235 /// Release a reservation.
236 pub fn release(&mut self, metadata: Metadata<C>) {
237 self.sender.0.send_lossy(Message::Release { metadata });
238 }
239}
240
241/// Mechanism to register authorized peers.
242///
243/// Peers that are not explicitly authorized
244/// will be blocked by commonware-p2p.
245#[derive(Debug, Clone)]
246pub struct Oracle<C: PublicKey> {
247 sender: UnboundedMailbox<Message<C>>,
248}
249
250impl<C: PublicKey> Oracle<C> {
251 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
252 Self { sender }
253 }
254}
255
256impl<C: PublicKey> crate::Provider for Oracle<C> {
257 type PublicKey = C;
258
259 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
260 self.sender
261 .0
262 .request(|responder| Message::PeerSet {
263 index: id,
264 responder,
265 })
266 .await
267 .flatten()
268 }
269
270 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
271 self.sender
272 .0
273 .request(|responder| Message::Subscribe { responder })
274 .await
275 .unwrap_or_else(|| {
276 let (_, rx) = mpsc::unbounded_channel();
277 rx
278 })
279 }
280}
281
282impl<C: PublicKey> crate::Manager for Oracle<C> {
283 async fn track(&mut self, index: u64, peers: Set<Self::PublicKey>) {
284 self.sender.0.send_lossy(Message::Register { index, peers });
285 }
286}
287
288impl<C: PublicKey> crate::Blocker for Oracle<C> {
289 type PublicKey = C;
290
291 async fn block(&mut self, public_key: Self::PublicKey) {
292 self.sender.0.send_lossy(Message::Block { public_key });
293 }
294}