commonware_p2p/authenticated/discovery/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::authenticated::{
3    discovery::{
4        actors::{peer, tracker::Metadata},
5        types,
6    },
7    Mailbox,
8};
9use commonware_cryptography::PublicKey;
10use commonware_runtime::{Metrics, Spawner};
11use futures::{
12    channel::{mpsc, oneshot},
13    SinkExt,
14};
15
16/// Messages that can be sent to the tracker actor.
17pub enum Message<E: Spawner + Metrics, C: PublicKey> {
18    // ---------- Used by oracle ----------
19    /// Register a peer set at a given index.
20    ///
21    /// The vector of peers must be sorted in ascending order by public key.
22    Register { index: u64, peers: Vec<C> },
23
24    // ---------- Used by blocker ----------
25    /// Block a peer, disconnecting them if currently connected and preventing future connections
26    /// for as long as the peer remains in at least one active peer set.
27    Block { public_key: C },
28
29    // ---------- Used by peer ----------
30    /// Notify the tracker that a peer has been successfully connected, and that a
31    /// [types::Payload::Peers] message (containing solely the local node's information) should be
32    /// sent to the peer.
33    Connect {
34        /// The public key of the peer.
35        public_key: C,
36
37        /// `true` if we are the dialer, `false` if we are the listener.
38        dialer: bool,
39
40        /// The mailbox of the peer actor.
41        peer: Mailbox<peer::Message<C>>,
42    },
43
44    /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
45    /// keep-alive signal to the peer.
46    ///
47    /// This request is formed on a recurring interval.
48    Construct {
49        /// The public key of the peer.
50        public_key: C,
51
52        /// The mailbox of the peer actor.
53        peer: Mailbox<peer::Message<C>>,
54    },
55
56    /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
57    ///
58    /// The tracker will construct a [types::Payload::Peers] message in response.
59    BitVec {
60        /// The bit vector received.
61        bit_vec: types::BitVec,
62
63        /// The mailbox of the peer actor.
64        peer: Mailbox<peer::Message<C>>,
65    },
66
67    /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
68    Peers {
69        /// The list of peers received.
70        peers: Vec<types::PeerInfo<C>>,
71
72        /// The mailbox of the peer actor.
73        peer: Mailbox<peer::Message<C>>,
74    },
75
76    // ---------- Used by dialer ----------
77    /// Request a list of dialable peers.
78    Dialable {
79        /// One-shot channel to send the list of dialable peers.
80        responder: oneshot::Sender<Vec<C>>,
81    },
82
83    /// Request a reservation for a particular peer to dial.
84    ///
85    /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if the
86    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
87    /// has an active reservation).
88    Dial {
89        /// The public key of the peer to reserve.
90        public_key: C,
91
92        /// sender to respond with the reservation.
93        reservation: oneshot::Sender<Option<Reservation<E, C>>>,
94    },
95
96    // ---------- Used by listener ----------
97    /// Check if we should listen to a peer.
98    Listenable {
99        /// The public key of the peer to check.
100        public_key: C,
101
102        /// The sender to respond with the listenable status.
103        responder: oneshot::Sender<bool>,
104    },
105
106    /// Request a reservation for a particular peer.
107    ///
108    /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if  the
109    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
110    /// has an active reservation).
111    Listen {
112        /// The public key of the peer to reserve.
113        public_key: C,
114
115        /// The sender to respond with the reservation.
116        reservation: oneshot::Sender<Option<Reservation<E, C>>>,
117    },
118
119    // ---------- Used by reservation ----------
120    /// Release a reservation.
121    Release {
122        /// The metadata of the reservation to release.
123        metadata: Metadata<C>,
124    },
125}
126
127impl<E: Spawner + Metrics, C: PublicKey> Mailbox<Message<E, C>> {
128    /// Send a `Connect` message to the tracker.
129    pub async fn connect(&mut self, public_key: C, dialer: bool, peer: Mailbox<peer::Message<C>>) {
130        self.send(Message::Connect {
131            public_key,
132            dialer,
133            peer,
134        })
135        .await
136        .unwrap();
137    }
138
139    /// Send a `Construct` message to the tracker.
140    pub async fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
141        self.send(Message::Construct { public_key, peer })
142            .await
143            .unwrap();
144    }
145
146    /// Send a `BitVec` message to the tracker.
147    pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
148        self.send(Message::BitVec { bit_vec, peer }).await.unwrap();
149    }
150
151    /// Send a `Peers` message to the tracker.
152    pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: Mailbox<peer::Message<C>>) {
153        self.send(Message::Peers { peers, peer }).await.unwrap();
154    }
155
156    /// Send a `Block` message to the tracker.
157    pub async fn dialable(&mut self) -> Vec<C> {
158        let (sender, receiver) = oneshot::channel();
159        self.send(Message::Dialable { responder: sender })
160            .await
161            .unwrap();
162        receiver.await.unwrap()
163    }
164
165    /// Send a `Dial` message to the tracker.
166    pub async fn dial(&mut self, public_key: C) -> Option<Reservation<E, C>> {
167        let (tx, rx) = oneshot::channel();
168        self.send(Message::Dial {
169            public_key,
170            reservation: tx,
171        })
172        .await
173        .unwrap();
174        rx.await.unwrap()
175    }
176
177    /// Send a `Listenable` message to the tracker.
178    pub async fn listenable(&mut self, public_key: C) -> bool {
179        let (tx, rx) = oneshot::channel();
180        self.send(Message::Listenable {
181            public_key,
182            responder: tx,
183        })
184        .await
185        .unwrap();
186        rx.await.unwrap()
187    }
188
189    /// Send a `Listen` message to the tracker.
190    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<E, C>> {
191        let (tx, rx) = oneshot::channel();
192        self.send(Message::Listen {
193            public_key,
194            reservation: tx,
195        })
196        .await
197        .unwrap();
198        rx.await.unwrap()
199    }
200}
201
202/// Allows releasing reservations
203#[derive(Clone)]
204pub struct Releaser<E: Spawner + Metrics, C: PublicKey> {
205    sender: mpsc::Sender<Message<E, C>>,
206}
207
208impl<E: Spawner + Metrics, C: PublicKey> Releaser<E, C> {
209    /// Create a new releaser.
210    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
211        Self { sender }
212    }
213
214    /// Try to release a reservation.
215    ///
216    /// Returns `true` if the reservation was released, `false` if the mailbox is full.
217    pub fn try_release(&mut self, metadata: Metadata<C>) -> bool {
218        let Err(e) = self.sender.try_send(Message::Release { metadata }) else {
219            return true;
220        };
221        assert!(
222            e.is_full(),
223            "Unexpected error trying to release reservation {e:?}"
224        );
225        false
226    }
227
228    /// Release a reservation.
229    ///
230    /// This method will block if the mailbox is full.
231    pub async fn release(&mut self, metadata: Metadata<C>) {
232        self.sender
233            .send(Message::Release { metadata })
234            .await
235            .unwrap();
236    }
237}
238
239/// Mechanism to register authorized peers.
240///
241/// Peers that are not explicitly authorized
242/// will be blocked by commonware-p2p.
243#[derive(Clone)]
244pub struct Oracle<E: Spawner + Metrics, C: PublicKey> {
245    sender: mpsc::Sender<Message<E, C>>,
246}
247
248impl<E: Spawner + Metrics, C: PublicKey> Oracle<E, C> {
249    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
250        Self { sender }
251    }
252
253    /// Register a set of authorized peers at a given index.
254    ///
255    /// These peer sets are used to construct a bit vector (sorted by public key)
256    /// to share knowledge about dialable IPs. If a peer does not yet have an index
257    /// associated with a bit vector, the discovery message will be dropped.
258    ///
259    /// # Parameters
260    ///
261    /// * `index` - Index of the set of authorized peers (like a blockchain height).
262    ///   Should be monotonically increasing.
263    /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
264    pub async fn register(&mut self, index: u64, peers: Vec<C>) {
265        let _ = self.sender.send(Message::Register { index, peers }).await;
266    }
267}
268
269impl<E: Spawner + Metrics, C: PublicKey> crate::Blocker for Oracle<E, C> {
270    type PublicKey = C;
271
272    async fn block(&mut self, public_key: Self::PublicKey) {
273        let _ = self.sender.send(Message::Block { public_key }).await;
274    }
275}