Skip to main content

commonware_p2p/authenticated/discovery/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        dialing::Dialable,
5        discovery::{
6            actors::{peer, tracker::Metadata},
7            types,
8        },
9    },
10    PeerSetSubscription, TrackedPeers,
11};
12use commonware_actor::{
13    mailbox::{self, Policy},
14    Feedback,
15};
16use commonware_cryptography::PublicKey;
17use commonware_utils::channel::{mpsc, oneshot};
18use std::collections::VecDeque;
19
20/// Messages that can be sent to the tracker actor.
21#[derive(Debug)]
22pub enum Message<C: PublicKey> {
23    // ---------- Used by oracle ----------
24    /// Register a peer set at a given index.
25    Register { index: u64, peers: TrackedPeers<C> },
26
27    // ---------- Used by peer set provider ----------
28    /// Fetch primary and secondary peers for a given ID.
29    PeerSet {
30        /// The index of the peer set to fetch.
31        index: u64,
32        /// One-shot channel to send the tracked peers.
33        responder: oneshot::Sender<Option<TrackedPeers<C>>>,
34    },
35    /// Subscribe to notifications when new peer sets are added.
36    Subscribe {
37        /// One-shot channel to send the subscription receiver.
38        responder: oneshot::Sender<PeerSetSubscription<C>>,
39    },
40
41    // ---------- Used by blocker ----------
42    /// Block a peer, disconnecting them if currently connected and preventing future connections
43    /// for as long as the peer remains in at least one active peer set.
44    Block { public_key: C },
45
46    // ---------- Used by peer ----------
47    /// Notify the tracker that a peer has been successfully connected.
48    ///
49    /// The tracker responds with the greeting info that must be sent to the peer
50    /// before any other messages. If the peer is not eligible, the channel is dropped
51    /// (signaling termination).
52    Connect {
53        /// The public key of the peer.
54        public_key: C,
55
56        /// The mailbox of the peer actor.
57        peer: peer::Mailbox<C>,
58
59        /// `true` if we are the dialer, `false` if we are the listener.
60        dialer: bool,
61
62        /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
63        responder: oneshot::Sender<types::Info<C>>,
64    },
65
66    /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
67    /// keep-alive signal to the peer.
68    ///
69    /// This request is formed on a recurring interval. The tracker sends any response to the
70    /// mailbox stored by the peer's most recent [`Message::Connect`].
71    Construct {
72        /// The public key of the peer.
73        public_key: C,
74    },
75
76    /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
77    ///
78    /// The tracker will construct a [types::Payload::Peers] message in response and send it to the
79    /// mailbox stored by the peer's most recent [`Message::Connect`].
80    BitVec {
81        /// The public key of the peer that sent the bit vector.
82        public_key: C,
83
84        /// The bit vector received.
85        bit_vec: types::BitVec,
86    },
87
88    /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
89    Peers {
90        /// The list of peers received.
91        peers: Vec<types::Info<C>>,
92    },
93
94    // ---------- Used by dialer ----------
95    /// Request a list of dialable peers.
96    Dialable {
97        /// One-shot channel to send the dialable peers and next query deadline.
98        responder: oneshot::Sender<Dialable<C>>,
99    },
100
101    /// Request a reservation for a particular peer to dial.
102    ///
103    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
104    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
105    /// has an active reservation).
106    Dial {
107        /// The public key of the peer to reserve.
108        public_key: C,
109
110        /// sender to respond with the reservation.
111        reservation: oneshot::Sender<Option<Reservation<C>>>,
112    },
113
114    // ---------- Used by listener ----------
115    /// Check if a peer is acceptable (can accept an incoming connection from them).
116    Acceptable {
117        /// The public key of the peer to check.
118        public_key: C,
119
120        /// The sender to respond with whether the peer is acceptable.
121        responder: oneshot::Sender<bool>,
122    },
123
124    /// Request a reservation for a particular peer.
125    ///
126    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
127    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
128    /// has an active reservation).
129    Listen {
130        /// The public key of the peer to reserve.
131        public_key: C,
132
133        /// The sender to respond with the reservation.
134        reservation: oneshot::Sender<Option<Reservation<C>>>,
135    },
136
137    // ---------- Used by reservation ----------
138    /// Release a reservation.
139    Release {
140        /// The metadata of the reservation to release.
141        metadata: Metadata<C>,
142    },
143}
144
145impl<C: PublicKey> Policy for Message<C> {
146    type Overflow = VecDeque<Self>;
147
148    fn handle(overflow: &mut Self::Overflow, message: Self) {
149        overflow.push_back(message);
150    }
151}
152
153/// Mailbox for sending messages to the tracker actor.
154#[derive(Clone, Debug)]
155pub struct Mailbox<C: PublicKey>(mailbox::Sender<Message<C>>);
156
157impl<C: PublicKey> Mailbox<C> {
158    pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
159        Self(sender)
160    }
161
162    /// Send a `Connect` message to the tracker and receive the greeting info.
163    ///
164    /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
165    /// dropped (peer not eligible or tracker shut down).
166    pub(crate) async fn connect(
167        &self,
168        public_key: C,
169        peer: peer::Mailbox<C>,
170        dialer: bool,
171    ) -> Option<types::Info<C>> {
172        let (responder, receiver) = oneshot::channel();
173        let _ = self.0.enqueue(Message::Connect {
174            public_key,
175            peer,
176            dialer,
177            responder,
178        });
179        receiver.await.ok()
180    }
181
182    /// Send a `Construct` message to the tracker.
183    pub(crate) fn construct(&self, public_key: C) -> Feedback {
184        self.0.enqueue(Message::Construct { public_key })
185    }
186
187    /// Send a `BitVec` message to the tracker.
188    pub(crate) fn bit_vec(&self, public_key: C, bit_vec: types::BitVec) -> Feedback {
189        self.0.enqueue(Message::BitVec {
190            public_key,
191            bit_vec,
192        })
193    }
194
195    /// Send a `Peers` message to the tracker.
196    pub(crate) fn peers(&self, peers: Vec<types::Info<C>>) -> Feedback {
197        self.0.enqueue(Message::Peers { peers })
198    }
199
200    /// Request dialable peers from the tracker.
201    ///
202    /// Returns an empty response if the tracker is shut down.
203    pub(crate) async fn dialable(&self) -> Dialable<C> {
204        let (responder, receiver) = oneshot::channel();
205        let _ = self.0.enqueue(Message::Dialable { responder });
206        receiver.await.unwrap_or_default()
207    }
208
209    /// Send a `Dial` message to the tracker.
210    ///
211    /// Returns `None` if the tracker is shut down.
212    pub(crate) async fn dial(&self, public_key: C) -> Option<Reservation<C>> {
213        let (reservation, receiver) = oneshot::channel();
214        let _ = self.0.enqueue(Message::Dial {
215            public_key,
216            reservation,
217        });
218        receiver.await.ok().flatten()
219    }
220
221    /// Send an `Acceptable` message to the tracker.
222    ///
223    /// Returns `false` if the tracker is shut down.
224    pub(crate) async fn acceptable(&self, public_key: C) -> bool {
225        let (responder, receiver) = oneshot::channel();
226        let _ = self.0.enqueue(Message::Acceptable {
227            public_key,
228            responder,
229        });
230        receiver.await.unwrap_or(false)
231    }
232
233    /// Send a `Listen` message to the tracker.
234    ///
235    /// Returns `None` if the tracker is shut down.
236    pub(crate) async fn listen(&self, public_key: C) -> Option<Reservation<C>> {
237        let (reservation, receiver) = oneshot::channel();
238        let _ = self.0.enqueue(Message::Listen {
239            public_key,
240            reservation,
241        });
242        receiver.await.ok().flatten()
243    }
244}
245
246/// Allows releasing reservations
247#[derive(Clone, Debug)]
248pub struct Releaser<C: PublicKey> {
249    sender: mailbox::Sender<Message<C>>,
250}
251
252impl<C: PublicKey> Releaser<C> {
253    /// Create a new releaser.
254    pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
255        Self { sender }
256    }
257
258    /// Release a reservation.
259    pub fn release(&mut self, metadata: Metadata<C>) -> Feedback {
260        self.sender.enqueue(Message::Release { metadata })
261    }
262}
263
264/// Mechanism to register authorized peers.
265///
266/// Peers that are not explicitly authorized
267/// will be blocked by commonware-p2p.
268#[derive(Debug, Clone)]
269pub struct Oracle<C: PublicKey> {
270    sender: mailbox::Sender<Message<C>>,
271}
272
273impl<C: PublicKey> Oracle<C> {
274    pub(super) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
275        Self { sender }
276    }
277}
278
279impl<C: PublicKey> crate::Provider for Oracle<C> {
280    type PublicKey = C;
281
282    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
283        let (responder, receiver) = oneshot::channel();
284        let _ = self.sender.enqueue(Message::PeerSet {
285            index: id,
286            responder,
287        });
288        receiver.await.ok().flatten()
289    }
290
291    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
292        let (responder, receiver) = oneshot::channel();
293        let _ = self.sender.enqueue(Message::Subscribe { responder });
294        receiver.await.unwrap_or_else(|_| {
295            let (_, rx) = mpsc::unbounded_channel();
296            rx
297        })
298    }
299}
300
301impl<C: PublicKey> crate::Manager for Oracle<C> {
302    fn track<R>(&mut self, index: u64, peers: R) -> Feedback
303    where
304        R: Into<TrackedPeers<Self::PublicKey>> + Send,
305    {
306        self.sender.enqueue(Message::Register {
307            index,
308            peers: peers.into(),
309        })
310    }
311}
312
313impl<C: PublicKey> crate::Blocker for Oracle<C> {
314    type PublicKey = C;
315
316    fn block(&mut self, public_key: Self::PublicKey) -> Feedback {
317        self.sender.enqueue(Message::Block { public_key })
318    }
319}