commonware_p2p/authenticated/discovery/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::authenticated::{
3    discovery::{
4        actors::{peer, tracker::Metadata},
5        types,
6    },
7    mailbox::UnboundedMailbox,
8    Mailbox,
9};
10use commonware_cryptography::PublicKey;
11use commonware_utils::{channels::fallible::FallibleExt, ordered::Set};
12use futures::channel::{mpsc, oneshot};
13
14/// Messages that can be sent to the tracker actor.
15#[derive(Debug)]
16pub enum Message<C: PublicKey> {
17    // ---------- Used by oracle ----------
18    /// Register a peer set at a given index.
19    Register { index: u64, peers: Set<C> },
20
21    // ---------- Used by peer set provider ----------
22    /// Fetch the peer set at a given index.
23    PeerSet {
24        /// The index of the peer set to fetch.
25        index: u64,
26        /// One-shot channel to send the peer set.
27        responder: oneshot::Sender<Option<Set<C>>>,
28    },
29    /// Subscribe to notifications when new peer sets are added.
30    Subscribe {
31        /// One-shot channel to send the subscription receiver.
32        #[allow(clippy::type_complexity)]
33        responder: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Set<C>, Set<C>)>>,
34    },
35
36    // ---------- Used by blocker ----------
37    /// Block a peer, disconnecting them if currently connected and preventing future connections
38    /// for as long as the peer remains in at least one active peer set.
39    Block { public_key: C },
40
41    // ---------- Used by peer ----------
42    /// Notify the tracker that a peer has been successfully connected.
43    ///
44    /// The tracker responds with the greeting info that must be sent to the peer
45    /// before any other messages. If the peer is not eligible, the channel is dropped
46    /// (signaling termination).
47    Connect {
48        /// The public key of the peer.
49        public_key: C,
50
51        /// `true` if we are the dialer, `false` if we are the listener.
52        dialer: bool,
53
54        /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
55        responder: oneshot::Sender<types::Info<C>>,
56    },
57
58    /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
59    /// keep-alive signal to the peer.
60    ///
61    /// This request is formed on a recurring interval.
62    Construct {
63        /// The public key of the peer.
64        public_key: C,
65
66        /// The mailbox of the peer actor.
67        peer: Mailbox<peer::Message<C>>,
68    },
69
70    /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
71    ///
72    /// The tracker will construct a [types::Payload::Peers] message in response.
73    BitVec {
74        /// The bit vector received.
75        bit_vec: types::BitVec,
76
77        /// The mailbox of the peer actor.
78        peer: Mailbox<peer::Message<C>>,
79    },
80
81    /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
82    Peers {
83        /// The list of peers received.
84        peers: Vec<types::Info<C>>,
85    },
86
87    // ---------- Used by dialer ----------
88    /// Request a list of dialable peers.
89    Dialable {
90        /// One-shot channel to send the list of dialable peers.
91        responder: oneshot::Sender<Vec<C>>,
92    },
93
94    /// Request a reservation for a particular peer to dial.
95    ///
96    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
97    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
98    /// has an active reservation).
99    Dial {
100        /// The public key of the peer to reserve.
101        public_key: C,
102
103        /// sender to respond with the reservation.
104        reservation: oneshot::Sender<Option<Reservation<C>>>,
105    },
106
107    // ---------- Used by listener ----------
108    /// Check if a peer is acceptable (can accept an incoming connection from them).
109    Acceptable {
110        /// The public key of the peer to check.
111        public_key: C,
112
113        /// The sender to respond with whether the peer is acceptable.
114        responder: oneshot::Sender<bool>,
115    },
116
117    /// Request a reservation for a particular peer.
118    ///
119    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
120    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
121    /// has an active reservation).
122    Listen {
123        /// The public key of the peer to reserve.
124        public_key: C,
125
126        /// The sender to respond with the reservation.
127        reservation: oneshot::Sender<Option<Reservation<C>>>,
128    },
129
130    // ---------- Used by reservation ----------
131    /// Release a reservation.
132    Release {
133        /// The metadata of the reservation to release.
134        metadata: Metadata<C>,
135    },
136}
137
138impl<C: PublicKey> UnboundedMailbox<Message<C>> {
139    /// Send a `Connect` message to the tracker and receive the greeting info.
140    ///
141    /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
142    /// dropped (peer not eligible or tracker shut down).
143    pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
144        self.0
145            .request(|responder| Message::Connect {
146                public_key,
147                dialer,
148                responder,
149            })
150            .await
151    }
152
153    /// Send a `Construct` message to the tracker.
154    pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
155        self.0.send_lossy(Message::Construct { public_key, peer });
156    }
157
158    /// Send a `BitVec` message to the tracker.
159    pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
160        self.0.send_lossy(Message::BitVec { bit_vec, peer });
161    }
162
163    /// Send a `Peers` message to the tracker.
164    pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
165        self.0.send_lossy(Message::Peers { peers });
166    }
167
168    /// Request a list of dialable peers from the tracker.
169    ///
170    /// Returns an empty list if the tracker is shut down.
171    pub async fn dialable(&mut self) -> Vec<C> {
172        self.0
173            .request_or_default(|responder| Message::Dialable { responder })
174            .await
175    }
176
177    /// Send a `Dial` message to the tracker.
178    ///
179    /// Returns `None` if the tracker is shut down.
180    pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
181        self.0
182            .request(|reservation| Message::Dial {
183                public_key,
184                reservation,
185            })
186            .await
187            .flatten()
188    }
189
190    /// Send an `Acceptable` message to the tracker.
191    ///
192    /// Returns `false` if the tracker is shut down.
193    pub async fn acceptable(&mut self, public_key: C) -> bool {
194        self.0
195            .request_or(
196                |responder| Message::Acceptable {
197                    public_key,
198                    responder,
199                },
200                false,
201            )
202            .await
203    }
204
205    /// Send a `Listen` message to the tracker.
206    ///
207    /// Returns `None` if the tracker is shut down.
208    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
209        self.0
210            .request(|reservation| Message::Listen {
211                public_key,
212                reservation,
213            })
214            .await
215            .flatten()
216    }
217}
218
219/// Allows releasing reservations
220#[derive(Clone)]
221pub struct Releaser<C: PublicKey> {
222    sender: UnboundedMailbox<Message<C>>,
223}
224
225impl<C: PublicKey> Releaser<C> {
226    /// Create a new releaser.
227    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
228        Self { sender }
229    }
230
231    /// Release a reservation.
232    pub fn release(&mut self, metadata: Metadata<C>) {
233        self.sender.0.send_lossy(Message::Release { metadata });
234    }
235}
236
237/// Mechanism to register authorized peers.
238///
239/// Peers that are not explicitly authorized
240/// will be blocked by commonware-p2p.
241#[derive(Debug, Clone)]
242pub struct Oracle<C: PublicKey> {
243    sender: UnboundedMailbox<Message<C>>,
244}
245
246impl<C: PublicKey> Oracle<C> {
247    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
248        Self { sender }
249    }
250}
251
252impl<C: PublicKey> crate::Manager for Oracle<C> {
253    type PublicKey = C;
254    type Peers = Set<C>;
255
256    /// Register a set of authorized peers at a given index.
257    ///
258    /// These peer sets are used to construct a bit vector (sorted by public key)
259    /// to share knowledge about dialable IPs. If a peer does not yet have an index
260    /// associated with a bit vector, the discovery message will be dropped.
261    ///
262    /// # Parameters
263    ///
264    /// * `index` - Index of the set of authorized peers (like a blockchain height).
265    ///   Must be monotonically increasing, per the rules of [Set].
266    /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
267    async fn update(&mut self, index: u64, peers: Self::Peers) {
268        self.sender.0.send_lossy(Message::Register { index, peers });
269    }
270
271    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
272        self.sender
273            .0
274            .request(|responder| Message::PeerSet {
275                index: id,
276                responder,
277            })
278            .await
279            .flatten()
280    }
281
282    async fn subscribe(
283        &mut self,
284    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
285        self.sender
286            .0
287            .request(|responder| Message::Subscribe { responder })
288            .await
289            .unwrap_or_else(|| {
290                let (_, rx) = mpsc::unbounded();
291                rx
292            })
293    }
294}
295
296impl<C: PublicKey> crate::Blocker for Oracle<C> {
297    type PublicKey = C;
298
299    async fn block(&mut self, public_key: Self::PublicKey) {
300        self.sender.0.send_lossy(Message::Block { public_key });
301    }
302}