commonware_p2p/authenticated/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::authenticated::{actors::peer, types};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Metrics, Spawner};
5use futures::{
6 channel::{mpsc, oneshot},
7 SinkExt,
8};
9
10/// Messages that can be sent to the tracker actor.
11pub enum Message<E: Spawner + Metrics, C: PublicKey> {
12 // ---------- Used by oracle ----------
13 /// Register a peer set at a given index.
14 ///
15 /// The vector of peers must be sorted in ascending order by public key.
16 Register { index: u64, peers: Vec<C> },
17
18 // ---------- Used by blocker ----------
19 /// Block a peer, disconnecting them if currently connected and preventing future connections
20 /// for as long as the peer remains in at least one active peer set.
21 Block { public_key: C },
22
23 // ---------- Used by peer ----------
24 /// Notify the tracker that a peer has been successfully connected, and that a
25 /// [`types::Payload::Peers`] message (containing solely the local node's information) should be
26 /// sent to the peer.
27 Connect {
28 /// The public key of the peer.
29 public_key: C,
30
31 /// `true` if we are the dialer, `false` if we are the listener.
32 dialer: bool,
33
34 /// The mailbox of the peer actor.
35 peer: peer::Mailbox<C>,
36 },
37
38 /// Ready to send a [`types::Payload::BitVec`] message to a peer. This message doubles as a
39 /// keep-alive signal to the peer.
40 ///
41 /// This request is formed on a recurring interval.
42 Construct {
43 /// The public key of the peer.
44 public_key: C,
45
46 /// The mailbox of the peer actor.
47 peer: peer::Mailbox<C>,
48 },
49
50 /// Notify the tracker that a [`types::Payload::BitVec`] message has been received from a peer.
51 ///
52 /// The tracker will construct a [`types::Payload::Peers`] message in response.
53 BitVec {
54 /// The bit vector received.
55 bit_vec: types::BitVec,
56
57 /// The mailbox of the peer actor.
58 peer: peer::Mailbox<C>,
59 },
60
61 /// Notify the tracker that a [`types::Payload::Peers`] message has been received from a peer.
62 Peers {
63 /// The list of peers received.
64 peers: Vec<types::PeerInfo<C>>,
65
66 /// The mailbox of the peer actor.
67 peer: peer::Mailbox<C>,
68 },
69
70 // ---------- Used by dialer ----------
71 /// Request a list of dialable peers.
72 Dialable {
73 /// One-shot channel to send the list of dialable peers.
74 responder: oneshot::Sender<Vec<C>>,
75 },
76
77 /// Request a reservation for a particular peer to dial.
78 ///
79 /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if the
80 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
81 /// has an active reservation).
82 Dial {
83 /// The public key of the peer to reserve.
84 public_key: C,
85
86 /// sender to respond with the reservation.
87 reservation: oneshot::Sender<Option<Reservation<E, C>>>,
88 },
89
90 // ---------- Used by listener ----------
91 /// Request a reservation for a particular peer.
92 ///
93 /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if the
94 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
95 /// has an active reservation).
96 Listen {
97 /// The public key of the peer to reserve.
98 public_key: C,
99
100 /// The sender to respond with the reservation.
101 reservation: oneshot::Sender<Option<Reservation<E, C>>>,
102 },
103}
104
105/// Mailbox for sending messages to the tracker actor.
106#[derive(Clone)]
107pub struct Mailbox<E: Spawner + Metrics, C: PublicKey> {
108 sender: mpsc::Sender<Message<E, C>>,
109}
110
111impl<E: Spawner + Metrics, C: PublicKey> Mailbox<E, C> {
112 /// Create a new mailbox for the tracker.
113 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
114 Self { sender }
115 }
116
117 /// Send a `Connect` message to the tracker.
118 pub async fn connect(&mut self, public_key: C, dialer: bool, peer: peer::Mailbox<C>) {
119 self.sender
120 .send(Message::Connect {
121 public_key,
122 dialer,
123 peer,
124 })
125 .await
126 .unwrap();
127 }
128
129 /// Send a `Construct` message to the tracker.
130 pub async fn construct(&mut self, public_key: C, peer: peer::Mailbox<C>) {
131 self.sender
132 .send(Message::Construct { public_key, peer })
133 .await
134 .unwrap();
135 }
136
137 /// Send a `BitVec` message to the tracker.
138 pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: peer::Mailbox<C>) {
139 self.sender
140 .send(Message::BitVec { bit_vec, peer })
141 .await
142 .unwrap();
143 }
144
145 /// Send a `Peers` message to the tracker.
146 pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: peer::Mailbox<C>) {
147 self.sender
148 .send(Message::Peers { peers, peer })
149 .await
150 .unwrap();
151 }
152
153 /// Send a `Block` message to the tracker.
154 pub async fn dialable(&mut self) -> Vec<C> {
155 let (sender, receiver) = oneshot::channel();
156 self.sender
157 .send(Message::Dialable { responder: sender })
158 .await
159 .unwrap();
160 receiver.await.unwrap()
161 }
162
163 /// Send a `Dial` message to the tracker.
164 pub async fn dial(&mut self, public_key: C) -> Option<Reservation<E, C>> {
165 let (tx, rx) = oneshot::channel();
166 self.sender
167 .send(Message::Dial {
168 public_key,
169 reservation: tx,
170 })
171 .await
172 .unwrap();
173 rx.await.unwrap()
174 }
175
176 /// Send a `Listen` message to the tracker.
177 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<E, C>> {
178 let (tx, rx) = oneshot::channel();
179 self.sender
180 .send(Message::Listen {
181 public_key,
182 reservation: tx,
183 })
184 .await
185 .unwrap();
186 rx.await.unwrap()
187 }
188}
189
190/// Mechanism to register authorized peers.
191///
192/// Peers that are not explicitly authorized
193/// will be blocked by commonware-p2p.
194#[derive(Clone)]
195pub struct Oracle<E: Spawner + Metrics, C: PublicKey> {
196 sender: mpsc::Sender<Message<E, C>>,
197}
198
199impl<E: Spawner + Metrics, C: PublicKey> Oracle<E, C> {
200 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
201 Self { sender }
202 }
203
204 /// Register a set of authorized peers at a given index.
205 ///
206 /// These peer sets are used to construct a bit vector (sorted by public key)
207 /// to share knowledge about dialable IPs. If a peer does not yet have an index
208 /// associated with a bit vector, the discovery message will be dropped.
209 ///
210 /// # Parameters
211 ///
212 /// * `index` - Index of the set of authorized peers (like a blockchain height).
213 /// Should be monotonically increasing.
214 /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
215 pub async fn register(&mut self, index: u64, peers: Vec<C>) {
216 let _ = self.sender.send(Message::Register { index, peers }).await;
217 }
218}
219
220impl<E: Spawner + Metrics, C: PublicKey> crate::Blocker for Oracle<E, C> {
221 type PublicKey = C;
222
223 async fn block(&mut self, public_key: Self::PublicKey) {
224 let _ = self.sender.send(Message::Block { public_key }).await;
225 }
226}