Skip to main content

commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        dialing::Dialable,
5        lookup::actors::{peer, tracker::Metadata},
6    },
7    types::Address,
8    AddressableTrackedPeers, Ingress, PeerSetSubscription, TrackedPeers,
9};
10use commonware_actor::{
11    mailbox::{self, Policy},
12    Feedback,
13};
14use commonware_cryptography::PublicKey;
15use commonware_utils::{
16    channel::{mpsc, oneshot},
17    ordered::Map,
18};
19use std::{collections::VecDeque, net::IpAddr};
20
21/// Messages that can be sent to the tracker actor.
22#[derive(Debug)]
23pub enum Message<C: PublicKey> {
24    // ---------- Used by oracle ----------
25    /// Register a peer set at a given index.
26    Register {
27        index: u64,
28        peers: AddressableTrackedPeers<C>,
29    },
30
31    /// Update addresses for multiple peers without creating a new peer set.
32    Overwrite { peers: Map<C, Address> },
33
34    // ---------- Used by peer set provider ----------
35    /// Fetch primary and secondary peers for a given ID.
36    PeerSet {
37        /// The index of the peer set to fetch.
38        index: u64,
39        /// One-shot channel to send the tracked peers.
40        responder: oneshot::Sender<Option<TrackedPeers<C>>>,
41    },
42    /// Subscribe to notifications when new peer sets are added.
43    Subscribe {
44        /// One-shot channel to send the subscription receiver.
45        responder: oneshot::Sender<PeerSetSubscription<C>>,
46    },
47
48    // ---------- Used by blocker ----------
49    /// Block a peer, disconnecting them if currently connected and preventing future connections
50    /// for as long as the peer remains in at least one active peer set.
51    Block { public_key: C },
52
53    // ---------- Used by peer ----------
54    /// Notify the tracker that a peer has been successfully connected.
55    Connect {
56        /// The public key of the peer.
57        public_key: C,
58
59        /// The mailbox of the peer actor.
60        peer: peer::Mailbox,
61    },
62
63    // ---------- Used by dialer ----------
64    /// Request a list of dialable peers.
65    Dialable {
66        /// One-shot channel to send the dialable peers and next query deadline.
67        responder: oneshot::Sender<Dialable<C>>,
68    },
69
70    /// Request a reservation for a particular peer to dial.
71    ///
72    /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
73    /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
74    /// blocked or already has an active reservation).
75    Dial {
76        /// The public key of the peer to reserve.
77        public_key: C,
78
79        /// Sender to respond with the reservation and ingress address.
80        reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
81    },
82
83    // ---------- Used by listener ----------
84    /// Check if a peer is acceptable (can accept an incoming connection from them).
85    Acceptable {
86        /// The public key of the peer to check.
87        public_key: C,
88
89        /// The IP address the peer connected from.
90        source_ip: IpAddr,
91
92        /// The sender to respond with whether the peer is acceptable.
93        responder: oneshot::Sender<bool>,
94    },
95
96    /// Request a reservation for a particular peer.
97    ///
98    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
99    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
100    /// has an active reservation).
101    Listen {
102        /// The public key of the peer to reserve.
103        public_key: C,
104
105        /// The IP address the peer connected from.
106        source_ip: IpAddr,
107
108        /// The sender to respond with the reservation.
109        reservation: oneshot::Sender<Option<Reservation<C>>>,
110    },
111
112    // ---------- Used by reservation ----------
113    /// Release a reservation.
114    Release {
115        /// The metadata of the reservation to release.
116        metadata: Metadata<C>,
117    },
118}
119
120impl<C: PublicKey> Policy for Message<C> {
121    type Overflow = VecDeque<Self>;
122
123    fn handle(overflow: &mut Self::Overflow, message: Self) {
124        overflow.push_back(message);
125    }
126}
127
128/// Mailbox for sending messages to the tracker actor.
129#[derive(Clone, Debug)]
130pub struct Mailbox<C: PublicKey>(mailbox::Sender<Message<C>>);
131
132impl<C: PublicKey> Mailbox<C> {
133    pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
134        Self(sender)
135    }
136
137    /// Send a `Connect` message to the tracker.
138    pub(crate) fn connect(&self, public_key: C, peer: peer::Mailbox) -> Feedback {
139        self.0.enqueue(Message::Connect { public_key, peer })
140    }
141
142    /// Request dialable peers from the tracker.
143    ///
144    /// Returns an empty response if the tracker is shut down.
145    pub(crate) async fn dialable(&self) -> Dialable<C> {
146        let (responder, receiver) = oneshot::channel();
147        let _ = self.0.enqueue(Message::Dialable { responder });
148        receiver.await.unwrap_or_default()
149    }
150
151    /// Send a `Dial` message to the tracker.
152    ///
153    /// Returns `None` if the tracker is shut down.
154    pub(crate) async fn dial(&self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
155        let (reservation, receiver) = oneshot::channel();
156        let _ = self.0.enqueue(Message::Dial {
157            public_key,
158            reservation,
159        });
160        receiver.await.ok().flatten()
161    }
162
163    /// Send an `Acceptable` message to the tracker.
164    ///
165    /// Returns `false` if the tracker is shut down.
166    pub(crate) async fn acceptable(&self, public_key: C, source_ip: IpAddr) -> bool {
167        let (responder, receiver) = oneshot::channel();
168        let _ = self.0.enqueue(Message::Acceptable {
169            public_key,
170            source_ip,
171            responder,
172        });
173        receiver.await.unwrap_or(false)
174    }
175
176    /// Send a `Listen` message to the tracker.
177    ///
178    /// Returns `None` if the tracker is shut down.
179    pub(crate) async fn listen(&self, public_key: C, source_ip: IpAddr) -> Option<Reservation<C>> {
180        let (reservation, receiver) = oneshot::channel();
181        let _ = self.0.enqueue(Message::Listen {
182            public_key,
183            source_ip,
184            reservation,
185        });
186        receiver.await.ok().flatten()
187    }
188}
189
190/// Allows releasing reservations
191#[derive(Clone, Debug)]
192pub struct Releaser<C: PublicKey> {
193    sender: mailbox::Sender<Message<C>>,
194}
195
196impl<C: PublicKey> Releaser<C> {
197    /// Create a new releaser.
198    pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
199        Self { sender }
200    }
201
202    /// Release a reservation.
203    pub fn release(&mut self, metadata: Metadata<C>) -> Feedback {
204        self.sender.enqueue(Message::Release { metadata })
205    }
206}
207
208/// Mechanism to register authorized peers.
209///
210/// Peers that are not explicitly authorized
211/// will be blocked by commonware-p2p.
212#[derive(Debug, Clone)]
213pub struct Oracle<C: PublicKey> {
214    sender: mailbox::Sender<Message<C>>,
215}
216
217impl<C: PublicKey> Oracle<C> {
218    pub(super) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
219        Self { sender }
220    }
221}
222
223impl<C: PublicKey> crate::Provider for Oracle<C> {
224    type PublicKey = C;
225
226    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
227        let (responder, receiver) = oneshot::channel();
228        let _ = self.sender.enqueue(Message::PeerSet {
229            index: id,
230            responder,
231        });
232        receiver.await.ok().flatten()
233    }
234
235    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
236        let (responder, receiver) = oneshot::channel();
237        let _ = self.sender.enqueue(Message::Subscribe { responder });
238        receiver.await.unwrap_or_else(|_| {
239            let (_, rx) = mpsc::unbounded_channel();
240            rx
241        })
242    }
243}
244
245impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
246    fn track<R>(&mut self, index: u64, peers: R) -> Feedback
247    where
248        R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
249    {
250        self.sender.enqueue(Message::Register {
251            index,
252            peers: peers.into(),
253        })
254    }
255
256    fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) -> Feedback {
257        self.sender.enqueue(Message::Overwrite { peers })
258    }
259}
260
261impl<C: PublicKey> crate::Blocker for Oracle<C> {
262    type PublicKey = C;
263
264    fn block(&mut self, public_key: Self::PublicKey) -> Feedback {
265        self.sender.enqueue(Message::Block { public_key })
266    }
267}