Skip to main content

commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        lookup::actors::{peer, tracker::Metadata},
5        mailbox::UnboundedMailbox,
6        Mailbox,
7    },
8    types::Address,
9    Ingress,
10};
11use commonware_cryptography::PublicKey;
12use commonware_utils::{
13    channel::{fallible::FallibleExt, mpsc, oneshot},
14    ordered::{Map, Set},
15};
16use std::net::IpAddr;
17
18/// Messages that can be sent to the tracker actor.
19#[derive(Debug)]
20pub enum Message<C: PublicKey> {
21    // ---------- Used by oracle ----------
22    /// Register a peer set at a given index.
23    Register { index: u64, peers: Map<C, Address> },
24
25    /// Update addresses for multiple peers without creating a new peer set.
26    Overwrite { peers: Map<C, Address> },
27
28    // ---------- Used by peer set provider ----------
29    /// Fetch the peer set at a given index.
30    PeerSet {
31        /// The index of the peer set to fetch.
32        index: u64,
33        /// One-shot channel to send the peer set.
34        responder: oneshot::Sender<Option<Set<C>>>,
35    },
36    /// Subscribe to notifications when new peer sets are added.
37    Subscribe {
38        /// One-shot channel to send the subscription receiver.
39        #[allow(clippy::type_complexity)]
40        responder: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Set<C>, Set<C>)>>,
41    },
42
43    // ---------- Used by blocker ----------
44    /// Block a peer, disconnecting them if currently connected and preventing future connections
45    /// for as long as the peer remains in at least one active peer set.
46    Block { public_key: C },
47
48    // ---------- Used by peer ----------
49    /// Notify the tracker that a peer has been successfully connected.
50    Connect {
51        /// The public key of the peer.
52        public_key: C,
53
54        /// The mailbox of the peer actor.
55        peer: Mailbox<peer::Message>,
56    },
57
58    // ---------- Used by dialer ----------
59    /// Request a list of dialable peers.
60    Dialable {
61        /// One-shot channel to send the list of dialable peers.
62        responder: oneshot::Sender<Vec<C>>,
63    },
64
65    /// Request a reservation for a particular peer to dial.
66    ///
67    /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
68    /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
69    /// blocked or already has an active reservation).
70    Dial {
71        /// The public key of the peer to reserve.
72        public_key: C,
73
74        /// Sender to respond with the reservation and ingress address.
75        reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
76    },
77
78    // ---------- Used by listener ----------
79    /// Check if a peer is acceptable (can accept an incoming connection from them).
80    Acceptable {
81        /// The public key of the peer to check.
82        public_key: C,
83
84        /// The IP address the peer connected from.
85        source_ip: IpAddr,
86
87        /// The sender to respond with whether the peer is acceptable.
88        responder: oneshot::Sender<bool>,
89    },
90
91    /// Request a reservation for a particular peer.
92    ///
93    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
94    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
95    /// has an active reservation).
96    Listen {
97        /// The public key of the peer to reserve.
98        public_key: C,
99
100        /// The sender to respond with the reservation.
101        reservation: oneshot::Sender<Option<Reservation<C>>>,
102    },
103
104    // ---------- Used by reservation ----------
105    /// Release a reservation.
106    Release {
107        /// The metadata of the reservation to release.
108        metadata: Metadata<C>,
109    },
110}
111
112impl<C: PublicKey> UnboundedMailbox<Message<C>> {
113    /// Send a `Connect` message to the tracker.
114    pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
115        self.0.send_lossy(Message::Connect { public_key, peer });
116    }
117
118    /// Request a list of dialable peers from the tracker.
119    ///
120    /// Returns an empty list if the tracker is shut down.
121    pub async fn dialable(&mut self) -> Vec<C> {
122        self.0
123            .request_or_default(|responder| Message::Dialable { responder })
124            .await
125    }
126
127    /// Send a `Dial` message to the tracker.
128    ///
129    /// Returns `None` if the tracker is shut down.
130    pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
131        self.0
132            .request(|reservation| Message::Dial {
133                public_key,
134                reservation,
135            })
136            .await
137            .flatten()
138    }
139
140    /// Send an `Acceptable` message to the tracker.
141    ///
142    /// Returns `false` if the tracker is shut down.
143    pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
144        self.0
145            .request_or(
146                |responder| Message::Acceptable {
147                    public_key,
148                    source_ip,
149                    responder,
150                },
151                false,
152            )
153            .await
154    }
155
156    /// Send a `Listen` message to the tracker.
157    ///
158    /// Returns `None` if the tracker is shut down.
159    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
160        self.0
161            .request(|reservation| Message::Listen {
162                public_key,
163                reservation,
164            })
165            .await
166            .flatten()
167    }
168}
169
170/// Allows releasing reservations
171#[derive(Clone, Debug)]
172pub struct Releaser<C: PublicKey> {
173    sender: UnboundedMailbox<Message<C>>,
174}
175
176impl<C: PublicKey> Releaser<C> {
177    /// Create a new releaser.
178    pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
179        Self { sender }
180    }
181
182    /// Release a reservation.
183    pub fn release(&mut self, metadata: Metadata<C>) {
184        self.sender.0.send_lossy(Message::Release { metadata });
185    }
186}
187
188/// Mechanism to register authorized peers.
189///
190/// Peers that are not explicitly authorized
191/// will be blocked by commonware-p2p.
192#[derive(Debug, Clone)]
193pub struct Oracle<C: PublicKey> {
194    sender: UnboundedMailbox<Message<C>>,
195}
196
197impl<C: PublicKey> Oracle<C> {
198    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
199        Self { sender }
200    }
201}
202
203impl<C: PublicKey> crate::Provider for Oracle<C> {
204    type PublicKey = C;
205
206    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
207        self.sender
208            .0
209            .request(|responder| Message::PeerSet {
210                index: id,
211                responder,
212            })
213            .await
214            .flatten()
215    }
216
217    async fn subscribe(
218        &mut self,
219    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
220        self.sender
221            .0
222            .request(|responder| Message::Subscribe { responder })
223            .await
224            .unwrap_or_else(|| {
225                let (_, rx) = mpsc::unbounded_channel();
226                rx
227            })
228    }
229}
230
231impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
232    async fn track(&mut self, index: u64, peers: Map<Self::PublicKey, Address>) {
233        self.sender.0.send_lossy(Message::Register { index, peers });
234    }
235
236    async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
237        self.sender.0.send_lossy(Message::Overwrite { peers });
238    }
239}
240
241impl<C: PublicKey> crate::Blocker for Oracle<C> {
242    type PublicKey = C;
243
244    async fn block(&mut self, public_key: Self::PublicKey) {
245        self.sender.0.send_lossy(Message::Block { public_key });
246    }
247}