Skip to main content

commonware_p2p/authenticated/discovery/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        dialing::Dialable,
5        discovery::{
6            actors::{peer, tracker::Metadata},
7            types,
8        },
9        mailbox::UnboundedMailbox,
10        Mailbox,
11    },
12    PeerSetSubscription, TrackedPeers,
13};
14use commonware_cryptography::PublicKey;
15use commonware_utils::channel::{fallible::FallibleExt, mpsc, oneshot};
16
17/// Messages that can be sent to the tracker actor.
18#[derive(Debug)]
19pub enum Message<C: PublicKey> {
20    // ---------- Used by oracle ----------
21    /// Register a peer set at a given index.
22    Register { index: u64, peers: TrackedPeers<C> },
23
24    // ---------- Used by peer set provider ----------
25    /// Fetch primary and secondary peers for a given ID.
26    PeerSet {
27        /// The index of the peer set to fetch.
28        index: u64,
29        /// One-shot channel to send the tracked peers.
30        responder: oneshot::Sender<Option<TrackedPeers<C>>>,
31    },
32    /// Subscribe to notifications when new peer sets are added.
33    Subscribe {
34        /// One-shot channel to send the subscription receiver.
35        responder: oneshot::Sender<PeerSetSubscription<C>>,
36    },
37
38    // ---------- Used by blocker ----------
39    /// Block a peer, disconnecting them if currently connected and preventing future connections
40    /// for as long as the peer remains in at least one active peer set.
41    Block { public_key: C },
42
43    // ---------- Used by peer ----------
44    /// Notify the tracker that a peer has been successfully connected.
45    ///
46    /// The tracker responds with the greeting info that must be sent to the peer
47    /// before any other messages. If the peer is not eligible, the channel is dropped
48    /// (signaling termination).
49    Connect {
50        /// The public key of the peer.
51        public_key: C,
52
53        /// `true` if we are the dialer, `false` if we are the listener.
54        dialer: bool,
55
56        /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
57        responder: oneshot::Sender<types::Info<C>>,
58    },
59
60    /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
61    /// keep-alive signal to the peer.
62    ///
63    /// This request is formed on a recurring interval.
64    Construct {
65        /// The public key of the peer.
66        public_key: C,
67
68        /// The mailbox of the peer actor.
69        peer: Mailbox<peer::Message<C>>,
70    },
71
72    /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
73    ///
74    /// The tracker will construct a [types::Payload::Peers] message in response.
75    BitVec {
76        /// The bit vector received.
77        bit_vec: types::BitVec,
78
79        /// The mailbox of the peer actor.
80        peer: Mailbox<peer::Message<C>>,
81    },
82
83    /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
84    Peers {
85        /// The list of peers received.
86        peers: Vec<types::Info<C>>,
87    },
88
89    // ---------- Used by dialer ----------
90    /// Request a list of dialable peers.
91    Dialable {
92        /// One-shot channel to send the dialable peers and next query deadline.
93        responder: oneshot::Sender<Dialable<C>>,
94    },
95
96    /// Request a reservation for a particular peer to dial.
97    ///
98    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
99    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
100    /// has an active reservation).
101    Dial {
102        /// The public key of the peer to reserve.
103        public_key: C,
104
105        /// sender to respond with the reservation.
106        reservation: oneshot::Sender<Option<Reservation<C>>>,
107    },
108
109    // ---------- Used by listener ----------
110    /// Check if a peer is acceptable (can accept an incoming connection from them).
111    Acceptable {
112        /// The public key of the peer to check.
113        public_key: C,
114
115        /// The sender to respond with whether the peer is acceptable.
116        responder: oneshot::Sender<bool>,
117    },
118
119    /// Request a reservation for a particular peer.
120    ///
121    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
122    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
123    /// has an active reservation).
124    Listen {
125        /// The public key of the peer to reserve.
126        public_key: C,
127
128        /// The sender to respond with the reservation.
129        reservation: oneshot::Sender<Option<Reservation<C>>>,
130    },
131
132    // ---------- Used by reservation ----------
133    /// Release a reservation.
134    Release {
135        /// The metadata of the reservation to release.
136        metadata: Metadata<C>,
137    },
138}
139
140impl<C: PublicKey> UnboundedMailbox<Message<C>> {
141    /// Send a `Connect` message to the tracker and receive the greeting info.
142    ///
143    /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
144    /// dropped (peer not eligible or tracker shut down).
145    pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
146        self.0
147            .request(|responder| Message::Connect {
148                public_key,
149                dialer,
150                responder,
151            })
152            .await
153    }
154
155    /// Send a `Construct` message to the tracker.
156    pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
157        self.0.send_lossy(Message::Construct { public_key, peer });
158    }
159
160    /// Send a `BitVec` message to the tracker.
161    pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
162        self.0.send_lossy(Message::BitVec { bit_vec, peer });
163    }
164
165    /// Send a `Peers` message to the tracker.
166    pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
167        self.0.send_lossy(Message::Peers { peers });
168    }
169
170    /// Request dialable peers from the tracker.
171    ///
172    /// Returns an empty response if the tracker is shut down.
173    pub async fn dialable(&mut self) -> Dialable<C> {
174        self.0
175            .request_or_default(|responder| Message::Dialable { responder })
176            .await
177    }
178
179    /// Send a `Dial` message to the tracker.
180    ///
181    /// Returns `None` if the tracker is shut down.
182    pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
183        self.0
184            .request(|reservation| Message::Dial {
185                public_key,
186                reservation,
187            })
188            .await
189            .flatten()
190    }
191
192    /// Send an `Acceptable` message to the tracker.
193    ///
194    /// Returns `false` if the tracker is shut down.
195    pub async fn acceptable(&mut self, public_key: C) -> bool {
196        self.0
197            .request_or(
198                |responder| Message::Acceptable {
199                    public_key,
200                    responder,
201                },
202                false,
203            )
204            .await
205    }
206
207    /// Send a `Listen` message to the tracker.
208    ///
209    /// Returns `None` if the tracker is shut down.
210    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
211        self.0
212            .request(|reservation| Message::Listen {
213                public_key,
214                reservation,
215            })
216            .await
217            .flatten()
218    }
219}
220
221/// Allows releasing reservations
222#[derive(Clone, Debug)]
223pub struct Releaser<C: PublicKey> {
224    sender: UnboundedMailbox<Message<C>>,
225}
226
227impl<C: PublicKey> Releaser<C> {
228    /// Create a new releaser.
229    pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
230        Self { sender }
231    }
232
233    /// Release a reservation.
234    pub fn release(&mut self, metadata: Metadata<C>) {
235        self.sender.0.send_lossy(Message::Release { metadata });
236    }
237}
238
239/// Mechanism to register authorized peers.
240///
241/// Peers that are not explicitly authorized
242/// will be blocked by commonware-p2p.
243#[derive(Debug, Clone)]
244pub struct Oracle<C: PublicKey> {
245    sender: UnboundedMailbox<Message<C>>,
246}
247
248impl<C: PublicKey> Oracle<C> {
249    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
250        Self { sender }
251    }
252}
253
254impl<C: PublicKey> crate::Provider for Oracle<C> {
255    type PublicKey = C;
256
257    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
258        self.sender
259            .0
260            .request(|responder| Message::PeerSet {
261                index: id,
262                responder,
263            })
264            .await
265            .flatten()
266    }
267
268    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
269        self.sender
270            .0
271            .request(|responder| Message::Subscribe { responder })
272            .await
273            .unwrap_or_else(|| {
274                let (_, rx) = mpsc::unbounded_channel();
275                rx
276            })
277    }
278}
279
280impl<C: PublicKey> crate::Manager for Oracle<C> {
281    async fn track<R>(&mut self, index: u64, peers: R)
282    where
283        R: Into<TrackedPeers<Self::PublicKey>> + Send,
284    {
285        self.sender.0.send_lossy(Message::Register {
286            index,
287            peers: peers.into(),
288        });
289    }
290}
291
292impl<C: PublicKey> crate::Blocker for Oracle<C> {
293    type PublicKey = C;
294
295    async fn block(&mut self, public_key: Self::PublicKey) {
296        self.sender.0.send_lossy(Message::Block { public_key });
297    }
298}