commonware_p2p/authenticated/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::authenticated::{actors::peer, types};
3use commonware_cryptography::Verifier;
4use commonware_runtime::{Metrics, Spawner};
5use futures::{
6 channel::{mpsc, oneshot},
7 SinkExt,
8};
9
10/// Messages that can be sent to the tracker actor.
11pub enum Message<E: Spawner + Metrics, C: Verifier> {
12 // ---------- Used by oracle ----------
13 /// Register a peer set at a given index.
14 ///
15 /// The vector of peers must be sorted in ascending order by public key.
16 Register {
17 index: u64,
18 peers: Vec<C::PublicKey>,
19 },
20
21 // ---------- Used by blocker ----------
22 /// Block a peer, disconnecting them if currently connected and preventing future connections
23 /// for as long as the peer remains in at least one active peer set.
24 Block { public_key: C::PublicKey },
25
26 // ---------- Used by peer ----------
27 /// Notify the tracker that a peer has been successfully connected, and that a
28 /// [`types::Payload::Peers`] message (containing solely the local node's information) should be
29 /// sent to the peer.
30 Connect {
31 /// The public key of the peer.
32 public_key: C::PublicKey,
33
34 /// `true` if we are the dialer, `false` if we are the listener.
35 dialer: bool,
36
37 /// The mailbox of the peer actor.
38 peer: peer::Mailbox<C>,
39 },
40
41 /// Ready to send a [`types::Payload::BitVec`] message to a peer. This message doubles as a
42 /// keep-alive signal to the peer.
43 ///
44 /// This request is formed on a recurring interval.
45 Construct {
46 /// The public key of the peer.
47 public_key: C::PublicKey,
48
49 /// The mailbox of the peer actor.
50 peer: peer::Mailbox<C>,
51 },
52
53 /// Notify the tracker that a [`types::Payload::BitVec`] message has been received from a peer.
54 ///
55 /// The tracker will construct a [`types::Payload::Peers`] message in response.
56 BitVec {
57 /// The bit vector received.
58 bit_vec: types::BitVec,
59
60 /// The mailbox of the peer actor.
61 peer: peer::Mailbox<C>,
62 },
63
64 /// Notify the tracker that a [`types::Payload::Peers`] message has been received from a peer.
65 Peers {
66 /// The list of peers received.
67 peers: Vec<types::PeerInfo<C>>,
68
69 /// The mailbox of the peer actor.
70 peer: peer::Mailbox<C>,
71 },
72
73 // ---------- Used by dialer ----------
74 /// Request a list of dialable peers.
75 Dialable {
76 /// One-shot channel to send the list of dialable peers.
77 responder: oneshot::Sender<Vec<C::PublicKey>>,
78 },
79
80 /// Request a reservation for a particular peer to dial.
81 ///
82 /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if the
83 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
84 /// has an active reservation).
85 Dial {
86 /// The public key of the peer to reserve.
87 public_key: C::PublicKey,
88
89 /// sender to respond with the reservation.
90 reservation: oneshot::Sender<Option<Reservation<E, C::PublicKey>>>,
91 },
92
93 // ---------- Used by listener ----------
94 /// Request a reservation for a particular peer.
95 ///
96 /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if the
97 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
98 /// has an active reservation).
99 Listen {
100 /// The public key of the peer to reserve.
101 public_key: C::PublicKey,
102
103 /// The sender to respond with the reservation.
104 reservation: oneshot::Sender<Option<Reservation<E, C::PublicKey>>>,
105 },
106}
107
108/// Mailbox for sending messages to the tracker actor.
109#[derive(Clone)]
110pub struct Mailbox<E: Spawner + Metrics, C: Verifier> {
111 sender: mpsc::Sender<Message<E, C>>,
112}
113
114impl<E: Spawner + Metrics, C: Verifier> Mailbox<E, C> {
115 /// Create a new mailbox for the tracker.
116 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
117 Self { sender }
118 }
119
120 /// Send a `Connect` message to the tracker.
121 pub async fn connect(
122 &mut self,
123 public_key: C::PublicKey,
124 dialer: bool,
125 peer: peer::Mailbox<C>,
126 ) {
127 self.sender
128 .send(Message::Connect {
129 public_key,
130 dialer,
131 peer,
132 })
133 .await
134 .unwrap();
135 }
136
137 /// Send a `Construct` message to the tracker.
138 pub async fn construct(&mut self, public_key: C::PublicKey, peer: peer::Mailbox<C>) {
139 self.sender
140 .send(Message::Construct { public_key, peer })
141 .await
142 .unwrap();
143 }
144
145 /// Send a `BitVec` message to the tracker.
146 pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: peer::Mailbox<C>) {
147 self.sender
148 .send(Message::BitVec { bit_vec, peer })
149 .await
150 .unwrap();
151 }
152
153 /// Send a `Peers` message to the tracker.
154 pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: peer::Mailbox<C>) {
155 self.sender
156 .send(Message::Peers { peers, peer })
157 .await
158 .unwrap();
159 }
160
161 /// Send a `Block` message to the tracker.
162 pub async fn dialable(&mut self) -> Vec<C::PublicKey> {
163 let (sender, receiver) = oneshot::channel();
164 self.sender
165 .send(Message::Dialable { responder: sender })
166 .await
167 .unwrap();
168 receiver.await.unwrap()
169 }
170
171 /// Send a `Dial` message to the tracker.
172 pub async fn dial(&mut self, public_key: C::PublicKey) -> Option<Reservation<E, C::PublicKey>> {
173 let (tx, rx) = oneshot::channel();
174 self.sender
175 .send(Message::Dial {
176 public_key,
177 reservation: tx,
178 })
179 .await
180 .unwrap();
181 rx.await.unwrap()
182 }
183
184 /// Send a `Listen` message to the tracker.
185 pub async fn listen(
186 &mut self,
187 public_key: C::PublicKey,
188 ) -> Option<Reservation<E, C::PublicKey>> {
189 let (tx, rx) = oneshot::channel();
190 self.sender
191 .send(Message::Listen {
192 public_key,
193 reservation: tx,
194 })
195 .await
196 .unwrap();
197 rx.await.unwrap()
198 }
199}
200
201/// Mechanism to register authorized peers.
202///
203/// Peers that are not explicitly authorized
204/// will be blocked by commonware-p2p.
205#[derive(Clone)]
206pub struct Oracle<E: Spawner + Metrics, C: Verifier> {
207 sender: mpsc::Sender<Message<E, C>>,
208}
209
210impl<E: Spawner + Metrics, C: Verifier> Oracle<E, C> {
211 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
212 Self { sender }
213 }
214
215 /// Register a set of authorized peers at a given index.
216 ///
217 /// These peer sets are used to construct a bit vector (sorted by public key)
218 /// to share knowledge about dialable IPs. If a peer does not yet have an index
219 /// associated with a bit vector, the discovery message will be dropped.
220 ///
221 /// # Parameters
222 ///
223 /// * `index` - Index of the set of authorized peers (like a blockchain height).
224 /// Should be monotonically increasing.
225 /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
226 pub async fn register(&mut self, index: u64, peers: Vec<C::PublicKey>) {
227 let _ = self.sender.send(Message::Register { index, peers }).await;
228 }
229}
230
231impl<E: Spawner + Metrics, C: Verifier> crate::Blocker for Oracle<E, C> {
232 type PublicKey = C::PublicKey;
233
234 async fn block(&mut self, public_key: Self::PublicKey) {
235 let _ = self.sender.send(Message::Block { public_key }).await;
236 }
237}