commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        lookup::actors::{peer, tracker::Metadata},
5        mailbox::UnboundedMailbox,
6        Mailbox,
7    },
8    types::Address,
9    Ingress,
10};
11use commonware_cryptography::PublicKey;
12use commonware_utils::{
13    channels::fallible::FallibleExt,
14    ordered::{Map, Set},
15};
16use futures::channel::{mpsc, oneshot};
17use std::net::IpAddr;
18
19/// Messages that can be sent to the tracker actor.
20#[derive(Debug)]
21pub enum Message<C: PublicKey> {
22    // ---------- Used by oracle ----------
23    /// Register a peer set at a given index.
24    Register { index: u64, peers: Map<C, Address> },
25
26    // ---------- Used by peer set provider ----------
27    /// Fetch the peer set at a given index.
28    PeerSet {
29        /// The index of the peer set to fetch.
30        index: u64,
31        /// One-shot channel to send the peer set.
32        responder: oneshot::Sender<Option<Set<C>>>,
33    },
34    /// Subscribe to notifications when new peer sets are added.
35    Subscribe {
36        /// One-shot channel to send the subscription receiver.
37        #[allow(clippy::type_complexity)]
38        responder: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Set<C>, Set<C>)>>,
39    },
40
41    // ---------- Used by blocker ----------
42    /// Block a peer, disconnecting them if currently connected and preventing future connections
43    /// for as long as the peer remains in at least one active peer set.
44    Block { public_key: C },
45
46    // ---------- Used by peer ----------
47    /// Notify the tracker that a peer has been successfully connected.
48    Connect {
49        /// The public key of the peer.
50        public_key: C,
51
52        /// The mailbox of the peer actor.
53        peer: Mailbox<peer::Message>,
54    },
55
56    // ---------- Used by dialer ----------
57    /// Request a list of dialable peers.
58    Dialable {
59        /// One-shot channel to send the list of dialable peers.
60        responder: oneshot::Sender<Vec<C>>,
61    },
62
63    /// Request a reservation for a particular peer to dial.
64    ///
65    /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
66    /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
67    /// blocked or already has an active reservation).
68    Dial {
69        /// The public key of the peer to reserve.
70        public_key: C,
71
72        /// Sender to respond with the reservation and ingress address.
73        reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
74    },
75
76    // ---------- Used by listener ----------
77    /// Check if a peer is acceptable (can accept an incoming connection from them).
78    Acceptable {
79        /// The public key of the peer to check.
80        public_key: C,
81
82        /// The IP address the peer connected from.
83        source_ip: IpAddr,
84
85        /// The sender to respond with whether the peer is acceptable.
86        responder: oneshot::Sender<bool>,
87    },
88
89    /// Request a reservation for a particular peer.
90    ///
91    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
92    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
93    /// has an active reservation).
94    Listen {
95        /// The public key of the peer to reserve.
96        public_key: C,
97
98        /// The sender to respond with the reservation.
99        reservation: oneshot::Sender<Option<Reservation<C>>>,
100    },
101
102    // ---------- Used by reservation ----------
103    /// Release a reservation.
104    Release {
105        /// The metadata of the reservation to release.
106        metadata: Metadata<C>,
107    },
108}
109
110impl<C: PublicKey> UnboundedMailbox<Message<C>> {
111    /// Send a `Connect` message to the tracker.
112    pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
113        self.0.send_lossy(Message::Connect { public_key, peer });
114    }
115
116    /// Request a list of dialable peers from the tracker.
117    ///
118    /// Returns an empty list if the tracker is shut down.
119    pub async fn dialable(&mut self) -> Vec<C> {
120        self.0
121            .request_or_default(|responder| Message::Dialable { responder })
122            .await
123    }
124
125    /// Send a `Dial` message to the tracker.
126    ///
127    /// Returns `None` if the tracker is shut down.
128    pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
129        self.0
130            .request(|reservation| Message::Dial {
131                public_key,
132                reservation,
133            })
134            .await
135            .flatten()
136    }
137
138    /// Send an `Acceptable` message to the tracker.
139    ///
140    /// Returns `false` if the tracker is shut down.
141    pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
142        self.0
143            .request_or(
144                |responder| Message::Acceptable {
145                    public_key,
146                    source_ip,
147                    responder,
148                },
149                false,
150            )
151            .await
152    }
153
154    /// Send a `Listen` message to the tracker.
155    ///
156    /// Returns `None` if the tracker is shut down.
157    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
158        self.0
159            .request(|reservation| Message::Listen {
160                public_key,
161                reservation,
162            })
163            .await
164            .flatten()
165    }
166}
167
168/// Allows releasing reservations
169#[derive(Clone)]
170pub struct Releaser<C: PublicKey> {
171    sender: UnboundedMailbox<Message<C>>,
172}
173
174impl<C: PublicKey> Releaser<C> {
175    /// Create a new releaser.
176    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
177        Self { sender }
178    }
179
180    /// Release a reservation.
181    pub fn release(&mut self, metadata: Metadata<C>) {
182        self.sender.0.send_lossy(Message::Release { metadata });
183    }
184}
185
186/// Mechanism to register authorized peers.
187///
188/// Peers that are not explicitly authorized
189/// will be blocked by commonware-p2p.
190#[derive(Debug, Clone)]
191pub struct Oracle<C: PublicKey> {
192    sender: UnboundedMailbox<Message<C>>,
193}
194
195impl<C: PublicKey> Oracle<C> {
196    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
197        Self { sender }
198    }
199}
200
201impl<C: PublicKey> crate::Manager for Oracle<C> {
202    type PublicKey = C;
203    type Peers = Map<C, Address>;
204
205    /// Register a set of authorized peers at a given index.
206    ///
207    /// # Parameters
208    ///
209    /// * `index` - Index of the set of authorized peers (like a blockchain height).
210    ///   Should be monotonically increasing.
211    /// * `peers` - Vector of authorized peers at an `index`.
212    ///   Each element contains the public key and address specification of the peer.
213    async fn update(&mut self, index: u64, peers: Self::Peers) {
214        self.sender.0.send_lossy(Message::Register { index, peers });
215    }
216
217    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
218        self.sender
219            .0
220            .request(|responder| Message::PeerSet {
221                index: id,
222                responder,
223            })
224            .await
225            .flatten()
226    }
227
228    async fn subscribe(
229        &mut self,
230    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
231        self.sender
232            .0
233            .request(|responder| Message::Subscribe { responder })
234            .await
235            .unwrap_or_else(|| {
236                let (_, rx) = mpsc::unbounded();
237                rx
238            })
239    }
240}
241
242impl<C: PublicKey> crate::Blocker for Oracle<C> {
243    type PublicKey = C;
244
245    async fn block(&mut self, public_key: Self::PublicKey) {
246        self.sender.0.send_lossy(Message::Block { public_key });
247    }
248}