Skip to main content

commonware_p2p/authenticated/discovery/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        discovery::{
5            actors::{peer, tracker::Metadata},
6            types,
7        },
8        mailbox::UnboundedMailbox,
9        Mailbox,
10    },
11    PeerSetSubscription,
12};
13use commonware_cryptography::PublicKey;
14use commonware_utils::{
15    channel::{fallible::FallibleExt, mpsc, oneshot},
16    ordered::Set,
17};
18
19/// Messages that can be sent to the tracker actor.
20#[derive(Debug)]
21pub enum Message<C: PublicKey> {
22    // ---------- Used by oracle ----------
23    /// Register a peer set at a given index.
24    Register { index: u64, peers: Set<C> },
25
26    // ---------- Used by peer set provider ----------
27    /// Fetch the peer set at a given index.
28    PeerSet {
29        /// The index of the peer set to fetch.
30        index: u64,
31        /// One-shot channel to send the peer set.
32        responder: oneshot::Sender<Option<Set<C>>>,
33    },
34    /// Subscribe to notifications when new peer sets are added.
35    Subscribe {
36        /// One-shot channel to send the subscription receiver.
37        responder: oneshot::Sender<PeerSetSubscription<C>>,
38    },
39
40    // ---------- Used by blocker ----------
41    /// Block a peer, disconnecting them if currently connected and preventing future connections
42    /// for as long as the peer remains in at least one active peer set.
43    Block { public_key: C },
44
45    // ---------- Used by peer ----------
46    /// Notify the tracker that a peer has been successfully connected.
47    ///
48    /// The tracker responds with the greeting info that must be sent to the peer
49    /// before any other messages. If the peer is not eligible, the channel is dropped
50    /// (signaling termination).
51    Connect {
52        /// The public key of the peer.
53        public_key: C,
54
55        /// `true` if we are the dialer, `false` if we are the listener.
56        dialer: bool,
57
58        /// One-shot channel to return the greeting info. Dropped if peer is not eligible.
59        responder: oneshot::Sender<types::Info<C>>,
60    },
61
62    /// Ready to send a [types::Payload::BitVec] message to a peer. This message doubles as a
63    /// keep-alive signal to the peer.
64    ///
65    /// This request is formed on a recurring interval.
66    Construct {
67        /// The public key of the peer.
68        public_key: C,
69
70        /// The mailbox of the peer actor.
71        peer: Mailbox<peer::Message<C>>,
72    },
73
74    /// Notify the tracker that a [types::Payload::BitVec] message has been received from a peer.
75    ///
76    /// The tracker will construct a [types::Payload::Peers] message in response.
77    BitVec {
78        /// The bit vector received.
79        bit_vec: types::BitVec,
80
81        /// The mailbox of the peer actor.
82        peer: Mailbox<peer::Message<C>>,
83    },
84
85    /// Notify the tracker that a [types::Payload::Peers] message has been received from a peer.
86    Peers {
87        /// The list of peers received.
88        peers: Vec<types::Info<C>>,
89    },
90
91    // ---------- Used by dialer ----------
92    /// Request a list of dialable peers.
93    Dialable {
94        /// One-shot channel to send the list of dialable peers.
95        responder: oneshot::Sender<Vec<C>>,
96    },
97
98    /// Request a reservation for a particular peer to dial.
99    ///
100    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
101    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
102    /// has an active reservation).
103    Dial {
104        /// The public key of the peer to reserve.
105        public_key: C,
106
107        /// sender to respond with the reservation.
108        reservation: oneshot::Sender<Option<Reservation<C>>>,
109    },
110
111    // ---------- Used by listener ----------
112    /// Check if a peer is acceptable (can accept an incoming connection from them).
113    Acceptable {
114        /// The public key of the peer to check.
115        public_key: C,
116
117        /// The sender to respond with whether the peer is acceptable.
118        responder: oneshot::Sender<bool>,
119    },
120
121    /// Request a reservation for a particular peer.
122    ///
123    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
124    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
125    /// has an active reservation).
126    Listen {
127        /// The public key of the peer to reserve.
128        public_key: C,
129
130        /// The sender to respond with the reservation.
131        reservation: oneshot::Sender<Option<Reservation<C>>>,
132    },
133
134    // ---------- Used by reservation ----------
135    /// Release a reservation.
136    Release {
137        /// The metadata of the reservation to release.
138        metadata: Metadata<C>,
139    },
140}
141
142impl<C: PublicKey> UnboundedMailbox<Message<C>> {
143    /// Send a `Connect` message to the tracker and receive the greeting info.
144    ///
145    /// Returns `Some(info)` if the peer is eligible, `None` if the channel was
146    /// dropped (peer not eligible or tracker shut down).
147    pub async fn connect(&mut self, public_key: C, dialer: bool) -> Option<types::Info<C>> {
148        self.0
149            .request(|responder| Message::Connect {
150                public_key,
151                dialer,
152                responder,
153            })
154            .await
155    }
156
157    /// Send a `Construct` message to the tracker.
158    pub fn construct(&mut self, public_key: C, peer: Mailbox<peer::Message<C>>) {
159        self.0.send_lossy(Message::Construct { public_key, peer });
160    }
161
162    /// Send a `BitVec` message to the tracker.
163    pub fn bit_vec(&mut self, bit_vec: types::BitVec, peer: Mailbox<peer::Message<C>>) {
164        self.0.send_lossy(Message::BitVec { bit_vec, peer });
165    }
166
167    /// Send a `Peers` message to the tracker.
168    pub fn peers(&mut self, peers: Vec<types::Info<C>>) {
169        self.0.send_lossy(Message::Peers { peers });
170    }
171
172    /// Request a list of dialable peers from the tracker.
173    ///
174    /// Returns an empty list if the tracker is shut down.
175    pub async fn dialable(&mut self) -> Vec<C> {
176        self.0
177            .request_or_default(|responder| Message::Dialable { responder })
178            .await
179    }
180
181    /// Send a `Dial` message to the tracker.
182    ///
183    /// Returns `None` if the tracker is shut down.
184    pub async fn dial(&mut self, public_key: C) -> Option<Reservation<C>> {
185        self.0
186            .request(|reservation| Message::Dial {
187                public_key,
188                reservation,
189            })
190            .await
191            .flatten()
192    }
193
194    /// Send an `Acceptable` message to the tracker.
195    ///
196    /// Returns `false` if the tracker is shut down.
197    pub async fn acceptable(&mut self, public_key: C) -> bool {
198        self.0
199            .request_or(
200                |responder| Message::Acceptable {
201                    public_key,
202                    responder,
203                },
204                false,
205            )
206            .await
207    }
208
209    /// Send a `Listen` message to the tracker.
210    ///
211    /// Returns `None` if the tracker is shut down.
212    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
213        self.0
214            .request(|reservation| Message::Listen {
215                public_key,
216                reservation,
217            })
218            .await
219            .flatten()
220    }
221}
222
223/// Allows releasing reservations
224#[derive(Clone, Debug)]
225pub struct Releaser<C: PublicKey> {
226    sender: UnboundedMailbox<Message<C>>,
227}
228
229impl<C: PublicKey> Releaser<C> {
230    /// Create a new releaser.
231    pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
232        Self { sender }
233    }
234
235    /// Release a reservation.
236    pub fn release(&mut self, metadata: Metadata<C>) {
237        self.sender.0.send_lossy(Message::Release { metadata });
238    }
239}
240
241/// Mechanism to register authorized peers.
242///
243/// Peers that are not explicitly authorized
244/// will be blocked by commonware-p2p.
245#[derive(Debug, Clone)]
246pub struct Oracle<C: PublicKey> {
247    sender: UnboundedMailbox<Message<C>>,
248}
249
250impl<C: PublicKey> Oracle<C> {
251    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
252        Self { sender }
253    }
254}
255
256impl<C: PublicKey> crate::Provider for Oracle<C> {
257    type PublicKey = C;
258
259    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
260        self.sender
261            .0
262            .request(|responder| Message::PeerSet {
263                index: id,
264                responder,
265            })
266            .await
267            .flatten()
268    }
269
270    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
271        self.sender
272            .0
273            .request(|responder| Message::Subscribe { responder })
274            .await
275            .unwrap_or_else(|| {
276                let (_, rx) = mpsc::unbounded_channel();
277                rx
278            })
279    }
280}
281
282impl<C: PublicKey> crate::Manager for Oracle<C> {
283    async fn track(&mut self, index: u64, peers: Set<Self::PublicKey>) {
284        self.sender.0.send_lossy(Message::Register { index, peers });
285    }
286}
287
288impl<C: PublicKey> crate::Blocker for Oracle<C> {
289    type PublicKey = C;
290
291    async fn block(&mut self, public_key: Self::PublicKey) {
292        self.sender.0.send_lossy(Message::Block { public_key });
293    }
294}