commonware_p2p/authenticated/discovery/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::authenticated::{
3 discovery::{
4 actors::{peer, tracker::Metadata},
5 types,
6 },
7 mailbox::UnboundedMailbox,
8 Mailbox,
9};
10use commonware_cryptography::PublicKey;
11use commonware_utils::{
12 channel::{fallible::FallibleExt, mpsc, oneshot},
13 ordered::Set,
14};
15
16/// Messages that can be sent to the tracker actor.
17#[derive(Debug)]
18pub enum Message<C: PublicKey> {
19 // ---------- Used by oracle ----------
20 /// Register a peer set at a given index.
21 Register { index: u64, peers: Set<C> },
22
23 // ---------- Used by peer set provider ----------
24 /// Fetch the peer set at a given index.
25 PeerSet {
26 /// The index of the peer set to fetch.
27 index: u64,
28 /// One-shot channel to send the peer set.
29 responder: oneshot::Sender<Option<Set<C>>>,
30 },
31 /// Subscribe to notifications when new peer sets are added.
32 Subscribe {
33 /// One-shot channel to send the subscription receiver.
34 #[allow(clippy::type_complexity)]
35 responder: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Set<C>, Set<C>)>>,
36 },
37
38 // ---------- Used by blocker ----------
39 /// Block a peer, disconnecting them if currently connected and preventing future connections
40 /// for as long as the peer remains in at least one active peer set.
41 Block { public_key: C },
42
43 // ---------- Used by peer ----------
44 /// Notify the tracker that a peer has been successfully connected.
45 ///
46 /// The tracker responds with the greeting info that must be sent to the peer
47 /// before any other messages. If the peer is not eligible, the channel is dropped
48 /// (signaling termination).
49 Connect {
50 /// The public key of the peer.
51 public_key: C,
52
53 /// `true` if we are the dialer, `false` if we are the listener.
54 dialer: bool,
55
56 /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
57 responder: oneshot::Sender<types::Info<C>>,
58 },
59
60 /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
61 /// keep-alive signal to the peer.
62 ///
63 /// This request is formed on a recurring interval.
64 Construct {
65 /// The public key of the peer.
66 public_key: C,
67
68 /// The mailbox of the peer actor.
69 peer: Mailbox<peer::Message<C>>,
70 },
71
72 /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
73 ///
74 /// The tracker will construct a [types::Payload::Peers] message in response.
75 BitVec {
76 /// The bit vector received.
77 bit_vec: types::BitVec,
78
79 /// The mailbox of the peer actor.
80 peer: Mailbox<peer::Message<C>>,
81 },
82
83 /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
84 Peers {
85 /// The list of peers received.
86 peers: Vec<types::Info<C>>,
87 },
88
89 // ---------- Used by dialer ----------
90 /// Request a list of dialable peers.
91 Dialable {
92 /// One-shot channel to send the list of dialable peers.
93 responder: oneshot::Sender<Vec<C>>,
94 },
95
96 /// Request a reservation for a particular peer to dial.
97 ///
98 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
99 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
100 /// has an active reservation).
101 Dial {
102 /// The public key of the peer to reserve.
103 public_key: C,
104
105 /// sender to respond with the reservation.
106 reservation: oneshot::Sender<Option<Reservation<C>>>,
107 },
108
109 // ---------- Used by listener ----------
110 /// Check if a peer is acceptable (can accept an incoming connection from them).
111 Acceptable {
112 /// The public key of the peer to check.
113 public_key: C,
114
115 /// The sender to respond with whether the peer is acceptable.
116 responder: oneshot::Sender<bool>,
117 },
118
119 /// Request a reservation for a particular peer.
120 ///
121 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
122 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
123 /// has an active reservation).
124 Listen {
125 /// The public key of the peer to reserve.
126 public_key: C,
127
128 /// The sender to respond with the reservation.
129 reservation: oneshot::Sender<Option<Reservation<C>>>,
130 },
131
132 // ---------- Used by reservation ----------
133 /// Release a reservation.
134 Release {
135 /// The metadata of the reservation to release.
136 metadata: Metadata<C>,
137 },
138}
139
140impl<C: PublicKey> UnboundedMailbox<Message<C>> {
141 /// Send a `Connect` message to the tracker and receive the greeting info.
142 ///
143 /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
144 /// dropped (peer not eligible or tracker shut down).
145 pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
146 self.0
147 .request(|responder| Message::Connect {
148 public_key,
149 dialer,
150 responder,
151 })
152 .await
153 }
154
155 /// Send a `Construct` message to the tracker.
156 pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
157 self.0.send_lossy(Message::Construct { public_key, peer });
158 }
159
160 /// Send a `BitVec` message to the tracker.
161 pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
162 self.0.send_lossy(Message::BitVec { bit_vec, peer });
163 }
164
165 /// Send a `Peers` message to the tracker.
166 pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
167 self.0.send_lossy(Message::Peers { peers });
168 }
169
170 /// Request a list of dialable peers from the tracker.
171 ///
172 /// Returns an empty list if the tracker is shut down.
173 pub async fn dialable(&mut self) -> Vec<C> {
174 self.0
175 .request_or_default(|responder| Message::Dialable { responder })
176 .await
177 }
178
179 /// Send a `Dial` message to the tracker.
180 ///
181 /// Returns `None` if the tracker is shut down.
182 pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
183 self.0
184 .request(|reservation| Message::Dial {
185 public_key,
186 reservation,
187 })
188 .await
189 .flatten()
190 }
191
192 /// Send an `Acceptable` message to the tracker.
193 ///
194 /// Returns `false` if the tracker is shut down.
195 pub async fn acceptable(&mut self, public_key: C) -> bool {
196 self.0
197 .request_or(
198 |responder| Message::Acceptable {
199 public_key,
200 responder,
201 },
202 false,
203 )
204 .await
205 }
206
207 /// Send a `Listen` message to the tracker.
208 ///
209 /// Returns `None` if the tracker is shut down.
210 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
211 self.0
212 .request(|reservation| Message::Listen {
213 public_key,
214 reservation,
215 })
216 .await
217 .flatten()
218 }
219}
220
221/// Allows releasing reservations
222#[derive(Clone, Debug)]
223pub struct Releaser<C: PublicKey> {
224 sender: UnboundedMailbox<Message<C>>,
225}
226
227impl<C: PublicKey> Releaser<C> {
228 /// Create a new releaser.
229 pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
230 Self { sender }
231 }
232
233 /// Release a reservation.
234 pub fn release(&mut self, metadata: Metadata<C>) {
235 self.sender.0.send_lossy(Message::Release { metadata });
236 }
237}
238
239/// Mechanism to register authorized peers.
240///
241/// Peers that are not explicitly authorized
242/// will be blocked by commonware-p2p.
243#[derive(Debug, Clone)]
244pub struct Oracle<C: PublicKey> {
245 sender: UnboundedMailbox<Message<C>>,
246}
247
248impl<C: PublicKey> Oracle<C> {
249 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
250 Self { sender }
251 }
252}
253
254impl<C: PublicKey> crate::Provider for Oracle<C> {
255 type PublicKey = C;
256
257 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
258 self.sender
259 .0
260 .request(|responder| Message::PeerSet {
261 index: id,
262 responder,
263 })
264 .await
265 .flatten()
266 }
267
268 async fn subscribe(
269 &mut self,
270 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
271 self.sender
272 .0
273 .request(|responder| Message::Subscribe { responder })
274 .await
275 .unwrap_or_else(|| {
276 let (_, rx) = mpsc::unbounded_channel();
277 rx
278 })
279 }
280}
281
282impl<C: PublicKey> crate::Manager for Oracle<C> {
283 async fn track(&mut self, index: u64, peers: Set<Self::PublicKey>) {
284 self.sender.0.send_lossy(Message::Register { index, peers });
285 }
286}
287
288impl<C: PublicKey> crate::Blocker for Oracle<C> {
289 type PublicKey = C;
290
291 async fn block(&mut self, public_key: Self::PublicKey) {
292 self.sender.0.send_lossy(Message::Block { public_key });
293 }
294}