commonware_p2p/authenticated/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::authenticated::{actors::peer, types};
3use commonware_cryptography::Verifier;
4use commonware_runtime::{Metrics, Spawner};
5use futures::{
6    channel::{mpsc, oneshot},
7    SinkExt,
8};
9
10/// Messages that can be sent to the tracker actor.
11pub enum Message<E: Spawner + Metrics, C: Verifier> {
12    // ---------- Used by oracle ----------
13    /// Register a peer set at a given index.
14    ///
15    /// The vector of peers must be sorted in ascending order by public key.
16    Register {
17        index: u64,
18        peers: Vec<C::PublicKey>,
19    },
20
21    // ---------- Used by blocker ----------
22    /// Block a peer, disconnecting them if currently connected and preventing future connections
23    /// for as long as the peer remains in at least one active peer set.
24    Block { public_key: C::PublicKey },
25
26    // ---------- Used by peer ----------
27    /// Notify the tracker that a peer has been successfully connected, and that a
28    /// [`types::Payload::Peers`] message (containing solely the local node's information) should be
29    /// sent to the peer.
30    Connect {
31        /// The public key of the peer.
32        public_key: C::PublicKey,
33
34        /// `true` if we are the dialer, `false` if we are the listener.
35        dialer: bool,
36
37        /// The mailbox of the peer actor.
38        peer: peer::Mailbox<C>,
39    },
40
41    /// Ready to send a [`types::Payload::BitVec`] message to a peer. This message doubles as a
42    /// keep-alive signal to the peer.
43    ///
44    /// This request is formed on a recurring interval.
45    Construct {
46        /// The public key of the peer.
47        public_key: C::PublicKey,
48
49        /// The mailbox of the peer actor.
50        peer: peer::Mailbox<C>,
51    },
52
53    /// Notify the tracker that a [`types::Payload::BitVec`] message has been received from a peer.
54    ///
55    /// The tracker will construct a [`types::Payload::Peers`] message in response.
56    BitVec {
57        /// The bit vector received.
58        bit_vec: types::BitVec,
59
60        /// The mailbox of the peer actor.
61        peer: peer::Mailbox<C>,
62    },
63
64    /// Notify the tracker that a [`types::Payload::Peers`] message has been received from a peer.
65    Peers {
66        /// The list of peers received.
67        peers: Vec<types::PeerInfo<C>>,
68
69        /// The mailbox of the peer actor.
70        peer: peer::Mailbox<C>,
71    },
72
73    // ---------- Used by dialer ----------
74    /// Request a list of dialable peers.
75    Dialable {
76        /// One-shot channel to send the list of dialable peers.
77        responder: oneshot::Sender<Vec<C::PublicKey>>,
78    },
79
80    /// Request a reservation for a particular peer to dial.
81    ///
82    /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if the
83    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
84    /// has an active reservation).
85    Dial {
86        /// The public key of the peer to reserve.
87        public_key: C::PublicKey,
88
89        /// sender to respond with the reservation.
90        reservation: oneshot::Sender<Option<Reservation<E, C::PublicKey>>>,
91    },
92
93    // ---------- Used by listener ----------
94    /// Request a reservation for a particular peer.
95    ///
96    /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if  the
97    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
98    /// has an active reservation).
99    Listen {
100        /// The public key of the peer to reserve.
101        public_key: C::PublicKey,
102
103        /// The sender to respond with the reservation.
104        reservation: oneshot::Sender<Option<Reservation<E, C::PublicKey>>>,
105    },
106}
107
108/// Mailbox for sending messages to the tracker actor.
109#[derive(Clone)]
110pub struct Mailbox<E: Spawner + Metrics, C: Verifier> {
111    sender: mpsc::Sender<Message<E, C>>,
112}
113
114impl<E: Spawner + Metrics, C: Verifier> Mailbox<E, C> {
115    /// Create a new mailbox for the tracker.
116    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
117        Self { sender }
118    }
119
120    /// Send a `Connect` message to the tracker.
121    pub async fn connect(
122        &mut self,
123        public_key: C::PublicKey,
124        dialer: bool,
125        peer: peer::Mailbox<C>,
126    ) {
127        self.sender
128            .send(Message::Connect {
129                public_key,
130                dialer,
131                peer,
132            })
133            .await
134            .unwrap();
135    }
136
137    /// Send a `Construct` message to the tracker.
138    pub async fn construct(&mut self, public_key: C::PublicKey, peer: peer::Mailbox<C>) {
139        self.sender
140            .send(Message::Construct { public_key, peer })
141            .await
142            .unwrap();
143    }
144
145    /// Send a `BitVec` message to the tracker.
146    pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: peer::Mailbox<C>) {
147        self.sender
148            .send(Message::BitVec { bit_vec, peer })
149            .await
150            .unwrap();
151    }
152
153    /// Send a `Peers` message to the tracker.
154    pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: peer::Mailbox<C>) {
155        self.sender
156            .send(Message::Peers { peers, peer })
157            .await
158            .unwrap();
159    }
160
161    /// Send a `Block` message to the tracker.
162    pub async fn dialable(&mut self) -> Vec<C::PublicKey> {
163        let (sender, receiver) = oneshot::channel();
164        self.sender
165            .send(Message::Dialable { responder: sender })
166            .await
167            .unwrap();
168        receiver.await.unwrap()
169    }
170
171    /// Send a `Dial` message to the tracker.
172    pub async fn dial(&mut self, public_key: C::PublicKey) -> Option<Reservation<E, C::PublicKey>> {
173        let (tx, rx) = oneshot::channel();
174        self.sender
175            .send(Message::Dial {
176                public_key,
177                reservation: tx,
178            })
179            .await
180            .unwrap();
181        rx.await.unwrap()
182    }
183
184    /// Send a `Listen` message to the tracker.
185    pub async fn listen(
186        &mut self,
187        public_key: C::PublicKey,
188    ) -> Option<Reservation<E, C::PublicKey>> {
189        let (tx, rx) = oneshot::channel();
190        self.sender
191            .send(Message::Listen {
192                public_key,
193                reservation: tx,
194            })
195            .await
196            .unwrap();
197        rx.await.unwrap()
198    }
199}
200
201/// Mechanism to register authorized peers.
202///
203/// Peers that are not explicitly authorized
204/// will be blocked by commonware-p2p.
205#[derive(Clone)]
206pub struct Oracle<E: Spawner + Metrics, C: Verifier> {
207    sender: mpsc::Sender<Message<E, C>>,
208}
209
210impl<E: Spawner + Metrics, C: Verifier> Oracle<E, C> {
211    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
212        Self { sender }
213    }
214
215    /// Register a set of authorized peers at a given index.
216    ///
217    /// These peer sets are used to construct a bit vector (sorted by public key)
218    /// to share knowledge about dialable IPs. If a peer does not yet have an index
219    /// associated with a bit vector, the discovery message will be dropped.
220    ///
221    /// # Parameters
222    ///
223    /// * `index` - Index of the set of authorized peers (like a blockchain height).
224    ///   Should be monotonically increasing.
225    /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
226    pub async fn register(&mut self, index: u64, peers: Vec<C::PublicKey>) {
227        let _ = self.sender.send(Message::Register { index, peers }).await;
228    }
229}
230
231impl<E: Spawner + Metrics, C: Verifier> crate::Blocker for Oracle<E, C> {
232    type PublicKey = C::PublicKey;
233
234    async fn block(&mut self, public_key: Self::PublicKey) {
235        let _ = self.sender.send(Message::Block { public_key }).await;
236    }
237}