Skip to main content

commonware_p2p/authenticated/discovery/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::authenticated::{
3    discovery::{
4        actors::{peer, tracker::Metadata},
5        types,
6    },
7    mailbox::UnboundedMailbox,
8    Mailbox,
9};
10use commonware_cryptography::PublicKey;
11use commonware_utils::{
12    channel::{fallible::FallibleExt, mpsc, oneshot},
13    ordered::Set,
14};
15
16/// Messages that can be sent to the tracker actor.
17#[derive(Debug)]
18pub enum Message<C: PublicKey> {
19    // ---------- Used by oracle ----------
20    /// Register a peer set at a given index.
21    Register { index: u64, peers: Set<C> },
22
23    // ---------- Used by peer set provider ----------
24    /// Fetch the peer set at a given index.
25    PeerSet {
26        /// The index of the peer set to fetch.
27        index: u64,
28        /// One-shot channel to send the peer set.
29        responder: oneshot::Sender<Option<Set<C>>>,
30    },
31    /// Subscribe to notifications when new peer sets are added.
32    Subscribe {
33        /// One-shot channel to send the subscription receiver.
34        #[allow(clippy::type_complexity)]
35        responder: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Set<C>, Set<C>)>>,
36    },
37
38    // ---------- Used by blocker ----------
39    /// Block a peer, disconnecting them if currently connected and preventing future connections
40    /// for as long as the peer remains in at least one active peer set.
41    Block { public_key: C },
42
43    // ---------- Used by peer ----------
44    /// Notify the tracker that a peer has been successfully connected.
45    ///
46    /// The tracker responds with the greeting info that must be sent to the peer
47    /// before any other messages. If the peer is not eligible, the channel is dropped
48    /// (signaling termination).
49    Connect {
50        /// The public key of the peer.
51        public_key: C,
52
53        /// `true` if we are the dialer, `false` if we are the listener.
54        dialer: bool,
55
56        /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
57        responder: oneshot::Sender<types::Info<C>>,
58    },
59
60    /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
61    /// keep-alive signal to the peer.
62    ///
63    /// This request is formed on a recurring interval.
64    Construct {
65        /// The public key of the peer.
66        public_key: C,
67
68        /// The mailbox of the peer actor.
69        peer: Mailbox<peer::Message<C>>,
70    },
71
72    /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
73    ///
74    /// The tracker will construct a [types::Payload::Peers] message in response.
75    BitVec {
76        /// The bit vector received.
77        bit_vec: types::BitVec,
78
79        /// The mailbox of the peer actor.
80        peer: Mailbox<peer::Message<C>>,
81    },
82
83    /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
84    Peers {
85        /// The list of peers received.
86        peers: Vec<types::Info<C>>,
87    },
88
89    // ---------- Used by dialer ----------
90    /// Request a list of dialable peers.
91    Dialable {
92        /// One-shot channel to send the list of dialable peers.
93        responder: oneshot::Sender<Vec<C>>,
94    },
95
96    /// Request a reservation for a particular peer to dial.
97    ///
98    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
99    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
100    /// has an active reservation).
101    Dial {
102        /// The public key of the peer to reserve.
103        public_key: C,
104
105        /// sender to respond with the reservation.
106        reservation: oneshot::Sender<Option<Reservation<C>>>,
107    },
108
109    // ---------- Used by listener ----------
110    /// Check if a peer is acceptable (can accept an incoming connection from them).
111    Acceptable {
112        /// The public key of the peer to check.
113        public_key: C,
114
115        /// The sender to respond with whether the peer is acceptable.
116        responder: oneshot::Sender<bool>,
117    },
118
119    /// Request a reservation for a particular peer.
120    ///
121    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
122    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
123    /// has an active reservation).
124    Listen {
125        /// The public key of the peer to reserve.
126        public_key: C,
127
128        /// The sender to respond with the reservation.
129        reservation: oneshot::Sender<Option<Reservation<C>>>,
130    },
131
132    // ---------- Used by reservation ----------
133    /// Release a reservation.
134    Release {
135        /// The metadata of the reservation to release.
136        metadata: Metadata<C>,
137    },
138}
139
140impl<C: PublicKey> UnboundedMailbox<Message<C>> {
141    /// Send a `Connect` message to the tracker and receive the greeting info.
142    ///
143    /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
144    /// dropped (peer not eligible or tracker shut down).
145    pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
146        self.0
147            .request(|responder| Message::Connect {
148                public_key,
149                dialer,
150                responder,
151            })
152            .await
153    }
154
155    /// Send a `Construct` message to the tracker.
156    pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
157        self.0.send_lossy(Message::Construct { public_key, peer });
158    }
159
160    /// Send a `BitVec` message to the tracker.
161    pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
162        self.0.send_lossy(Message::BitVec { bit_vec, peer });
163    }
164
165    /// Send a `Peers` message to the tracker.
166    pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
167        self.0.send_lossy(Message::Peers { peers });
168    }
169
170    /// Request a list of dialable peers from the tracker.
171    ///
172    /// Returns an empty list if the tracker is shut down.
173    pub async fn dialable(&mut self) -> Vec<C> {
174        self.0
175            .request_or_default(|responder| Message::Dialable { responder })
176            .await
177    }
178
179    /// Send a `Dial` message to the tracker.
180    ///
181    /// Returns `None` if the tracker is shut down.
182    pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
183        self.0
184            .request(|reservation| Message::Dial {
185                public_key,
186                reservation,
187            })
188            .await
189            .flatten()
190    }
191
192    /// Send an `Acceptable` message to the tracker.
193    ///
194    /// Returns `false` if the tracker is shut down.
195    pub async fn acceptable(&mut self, public_key: C) -> bool {
196        self.0
197            .request_or(
198                |responder| Message::Acceptable {
199                    public_key,
200                    responder,
201                },
202                false,
203            )
204            .await
205    }
206
207    /// Send a `Listen` message to the tracker.
208    ///
209    /// Returns `None` if the tracker is shut down.
210    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
211        self.0
212            .request(|reservation| Message::Listen {
213                public_key,
214                reservation,
215            })
216            .await
217            .flatten()
218    }
219}
220
221/// Allows releasing reservations
222#[derive(Clone, Debug)]
223pub struct Releaser<C: PublicKey> {
224    sender: UnboundedMailbox<Message<C>>,
225}
226
227impl<C: PublicKey> Releaser<C> {
228    /// Create a new releaser.
229    pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
230        Self { sender }
231    }
232
233    /// Release a reservation.
234    pub fn release(&mut self, metadata: Metadata<C>) {
235        self.sender.0.send_lossy(Message::Release { metadata });
236    }
237}
238
239/// Mechanism to register authorized peers.
240///
241/// Peers that are not explicitly authorized
242/// will be blocked by commonware-p2p.
243#[derive(Debug, Clone)]
244pub struct Oracle<C: PublicKey> {
245    sender: UnboundedMailbox<Message<C>>,
246}
247
248impl<C: PublicKey> Oracle<C> {
249    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
250        Self { sender }
251    }
252}
253
254impl<C: PublicKey> crate::Provider for Oracle<C> {
255    type PublicKey = C;
256
257    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
258        self.sender
259            .0
260            .request(|responder| Message::PeerSet {
261                index: id,
262                responder,
263            })
264            .await
265            .flatten()
266    }
267
268    async fn subscribe(
269        &mut self,
270    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
271        self.sender
272            .0
273            .request(|responder| Message::Subscribe { responder })
274            .await
275            .unwrap_or_else(|| {
276                let (_, rx) = mpsc::unbounded_channel();
277                rx
278            })
279    }
280}
281
282impl<C: PublicKey> crate::Manager for Oracle<C> {
283    async fn track(&mut self, index: u64, peers: Set<Self::PublicKey>) {
284        self.sender.0.send_lossy(Message::Register { index, peers });
285    }
286}
287
288impl<C: PublicKey> crate::Blocker for Oracle<C> {
289    type PublicKey = C;
290
291    async fn block(&mut self, public_key: Self::PublicKey) {
292        self.sender.0.send_lossy(Message::Block { public_key });
293    }
294}