commonware_p2p/authenticated/discovery/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::authenticated::{
3 discovery::{
4 actors::{peer, tracker::Metadata},
5 types,
6 },
7 Mailbox,
8};
9use commonware_cryptography::PublicKey;
10use commonware_runtime::{Metrics, Spawner};
11use futures::{
12 channel::{mpsc, oneshot},
13 SinkExt,
14};
15
16/// Messages that can be sent to the tracker actor.
17pub enum Message<E: Spawner + Metrics, C: PublicKey> {
18 // ---------- Used by oracle ----------
19 /// Register a peer set at a given index.
20 ///
21 /// The vector of peers must be sorted in ascending order by public key.
22 Register { index: u64, peers: Vec<C> },
23
24 // ---------- Used by blocker ----------
25 /// Block a peer, disconnecting them if currently connected and preventing future connections
26 /// for as long as the peer remains in at least one active peer set.
27 Block { public_key: C },
28
29 // ---------- Used by peer ----------
30 /// Notify the tracker that a peer has been successfully connected, and that a
31 /// [types::Payload::Peers] message (containing solely the local node's information) should be
32 /// sent to the peer.
33 Connect {
34 /// The public key of the peer.
35 public_key: C,
36
37 /// `true` if we are the dialer, `false` if we are the listener.
38 dialer: bool,
39
40 /// The mailbox of the peer actor.
41 peer: Mailbox<peer::Message<C>>,
42 },
43
44 /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
45 /// keep-alive signal to the peer.
46 ///
47 /// This request is formed on a recurring interval.
48 Construct {
49 /// The public key of the peer.
50 public_key: C,
51
52 /// The mailbox of the peer actor.
53 peer: Mailbox<peer::Message<C>>,
54 },
55
56 /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
57 ///
58 /// The tracker will construct a [types::Payload::Peers] message in response.
59 BitVec {
60 /// The bit vector received.
61 bit_vec: types::BitVec,
62
63 /// The mailbox of the peer actor.
64 peer: Mailbox<peer::Message<C>>,
65 },
66
67 /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
68 Peers {
69 /// The list of peers received.
70 peers: Vec<types::PeerInfo<C>>,
71
72 /// The mailbox of the peer actor.
73 peer: Mailbox<peer::Message<C>>,
74 },
75
76 // ---------- Used by dialer ----------
77 /// Request a list of dialable peers.
78 Dialable {
79 /// One-shot channel to send the list of dialable peers.
80 responder: oneshot::Sender<Vec<C>>,
81 },
82
83 /// Request a reservation for a particular peer to dial.
84 ///
85 /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if the
86 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
87 /// has an active reservation).
88 Dial {
89 /// The public key of the peer to reserve.
90 public_key: C,
91
92 /// sender to respond with the reservation.
93 reservation: oneshot::Sender<Option<Reservation<E, C>>>,
94 },
95
96 // ---------- Used by listener ----------
97 /// Check if we should listen to a peer.
98 Listenable {
99 /// The public key of the peer to check.
100 public_key: C,
101
102 /// The sender to respond with the listenable status.
103 responder: oneshot::Sender<bool>,
104 },
105
106 /// Request a reservation for a particular peer.
107 ///
108 /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if the
109 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
110 /// has an active reservation).
111 Listen {
112 /// The public key of the peer to reserve.
113 public_key: C,
114
115 /// The sender to respond with the reservation.
116 reservation: oneshot::Sender<Option<Reservation<E, C>>>,
117 },
118
119 // ---------- Used by reservation ----------
120 /// Release a reservation.
121 Release {
122 /// The metadata of the reservation to release.
123 metadata: Metadata<C>,
124 },
125}
126
127impl<E: Spawner + Metrics, C: PublicKey> Mailbox<Message<E, C>> {
128 /// Send a `Connect` message to the tracker.
129 pub async fn connect(&mut self, public_key: C, dialer: bool, peer: Mailbox<peer::Message<C>>) {
130 self.send(Message::Connect {
131 public_key,
132 dialer,
133 peer,
134 })
135 .await
136 .unwrap();
137 }
138
139 /// Send a `Construct` message to the tracker.
140 pub async fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
141 self.send(Message::Construct { public_key, peer })
142 .await
143 .unwrap();
144 }
145
146 /// Send a `BitVec` message to the tracker.
147 pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
148 self.send(Message::BitVec { bit_vec, peer }).await.unwrap();
149 }
150
151 /// Send a `Peers` message to the tracker.
152 pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: Mailbox<peer::Message<C>>) {
153 self.send(Message::Peers { peers, peer }).await.unwrap();
154 }
155
156 /// Send a `Block` message to the tracker.
157 pub async fn dialable(&mut self) -> Vec<C> {
158 let (sender, receiver) = oneshot::channel();
159 self.send(Message::Dialable { responder: sender })
160 .await
161 .unwrap();
162 receiver.await.unwrap()
163 }
164
165 /// Send a `Dial` message to the tracker.
166 pub async fn dial(&mut self, public_key: C) -> Option<Reservation<E, C>> {
167 let (tx, rx) = oneshot::channel();
168 self.send(Message::Dial {
169 public_key,
170 reservation: tx,
171 })
172 .await
173 .unwrap();
174 rx.await.unwrap()
175 }
176
177 /// Send a `Listenable` message to the tracker.
178 pub async fn listenable(&mut self, public_key: C) -> bool {
179 let (tx, rx) = oneshot::channel();
180 self.send(Message::Listenable {
181 public_key,
182 responder: tx,
183 })
184 .await
185 .unwrap();
186 rx.await.unwrap()
187 }
188
189 /// Send a `Listen` message to the tracker.
190 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<E, C>> {
191 let (tx, rx) = oneshot::channel();
192 self.send(Message::Listen {
193 public_key,
194 reservation: tx,
195 })
196 .await
197 .unwrap();
198 rx.await.unwrap()
199 }
200}
201
202/// Allows releasing reservations
203#[derive(Clone)]
204pub struct Releaser<E: Spawner + Metrics, C: PublicKey> {
205 sender: mpsc::Sender<Message<E, C>>,
206}
207
208impl<E: Spawner + Metrics, C: PublicKey> Releaser<E, C> {
209 /// Create a new releaser.
210 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
211 Self { sender }
212 }
213
214 /// Try to release a reservation.
215 ///
216 /// Returns `true` if the reservation was released, `false` if the mailbox is full.
217 pub fn try_release(&mut self, metadata: Metadata<C>) -> bool {
218 let Err(e) = self.sender.try_send(Message::Release { metadata }) else {
219 return true;
220 };
221 assert!(
222 e.is_full(),
223 "Unexpected error trying to release reservation {e:?}"
224 );
225 false
226 }
227
228 /// Release a reservation.
229 ///
230 /// This method will block if the mailbox is full.
231 pub async fn release(&mut self, metadata: Metadata<C>) {
232 self.sender
233 .send(Message::Release { metadata })
234 .await
235 .unwrap();
236 }
237}
238
239/// Mechanism to register authorized peers.
240///
241/// Peers that are not explicitly authorized
242/// will be blocked by commonware-p2p.
243#[derive(Clone)]
244pub struct Oracle<E: Spawner + Metrics, C: PublicKey> {
245 sender: mpsc::Sender<Message<E, C>>,
246}
247
248impl<E: Spawner + Metrics, C: PublicKey> Oracle<E, C> {
249 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
250 Self { sender }
251 }
252
253 /// Register a set of authorized peers at a given index.
254 ///
255 /// These peer sets are used to construct a bit vector (sorted by public key)
256 /// to share knowledge about dialable IPs. If a peer does not yet have an index
257 /// associated with a bit vector, the discovery message will be dropped.
258 ///
259 /// # Parameters
260 ///
261 /// * `index` - Index of the set of authorized peers (like a blockchain height).
262 /// Should be monotonically increasing.
263 /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
264 pub async fn register(&mut self, index: u64, peers: Vec<C>) {
265 let _ = self.sender.send(Message::Register { index, peers }).await;
266 }
267}
268
269impl<E: Spawner + Metrics, C: PublicKey> crate::Blocker for Oracle<E, C> {
270 type PublicKey = C;
271
272 async fn block(&mut self, public_key: Self::PublicKey) {
273 let _ = self.sender.send(Message::Block { public_key }).await;
274 }
275}