Skip to main content

commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        dialing::Dialable,
5        lookup::actors::{peer, tracker::Metadata},
6        mailbox::UnboundedMailbox,
7        Mailbox,
8    },
9    types::Address,
10    AddressableTrackedPeers, Ingress, PeerSetSubscription, TrackedPeers,
11};
12use commonware_cryptography::PublicKey;
13use commonware_utils::{
14    channel::{fallible::FallibleExt, mpsc, oneshot},
15    ordered::Map,
16};
17use std::net::IpAddr;
18
19/// Messages that can be sent to the tracker actor.
20#[derive(Debug)]
21pub enum Message<C: PublicKey> {
22    // ---------- Used by oracle ----------
23    /// Register a peer set at a given index.
24    Register {
25        index: u64,
26        peers: AddressableTrackedPeers<C>,
27    },
28
29    /// Update addresses for multiple peers without creating a new peer set.
30    Overwrite { peers: Map<C, Address> },
31
32    // ---------- Used by peer set provider ----------
33    /// Fetch primary and secondary peers for a given ID.
34    PeerSet {
35        /// The index of the peer set to fetch.
36        index: u64,
37        /// One-shot channel to send the tracked peers.
38        responder: oneshot::Sender<Option<TrackedPeers<C>>>,
39    },
40    /// Subscribe to notifications when new peer sets are added.
41    Subscribe {
42        /// One-shot channel to send the subscription receiver.
43        responder: oneshot::Sender<PeerSetSubscription<C>>,
44    },
45
46    // ---------- Used by blocker ----------
47    /// Block a peer, disconnecting them if currently connected and preventing future connections
48    /// for as long as the peer remains in at least one active peer set.
49    Block { public_key: C },
50
51    // ---------- Used by peer ----------
52    /// Notify the tracker that a peer has been successfully connected.
53    Connect {
54        /// The public key of the peer.
55        public_key: C,
56
57        /// The mailbox of the peer actor.
58        peer: Mailbox<peer::Message>,
59    },
60
61    // ---------- Used by dialer ----------
62    /// Request a list of dialable peers.
63    Dialable {
64        /// One-shot channel to send the dialable peers and next query deadline.
65        responder: oneshot::Sender<Dialable<C>>,
66    },
67
68    /// Request a reservation for a particular peer to dial.
69    ///
70    /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
71    /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
72    /// blocked or already has an active reservation).
73    Dial {
74        /// The public key of the peer to reserve.
75        public_key: C,
76
77        /// Sender to respond with the reservation and ingress address.
78        reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
79    },
80
81    // ---------- Used by listener ----------
82    /// Check if a peer is acceptable (can accept an incoming connection from them).
83    Acceptable {
84        /// The public key of the peer to check.
85        public_key: C,
86
87        /// The IP address the peer connected from.
88        source_ip: IpAddr,
89
90        /// The sender to respond with whether the peer is acceptable.
91        responder: oneshot::Sender<bool>,
92    },
93
94    /// Request a reservation for a particular peer.
95    ///
96    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
97    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
98    /// has an active reservation).
99    Listen {
100        /// The public key of the peer to reserve.
101        public_key: C,
102
103        /// The sender to respond with the reservation.
104        reservation: oneshot::Sender<Option<Reservation<C>>>,
105    },
106
107    // ---------- Used by reservation ----------
108    /// Release a reservation.
109    Release {
110        /// The metadata of the reservation to release.
111        metadata: Metadata<C>,
112    },
113}
114
115impl<C: PublicKey> UnboundedMailbox<Message<C>> {
116    /// Send a `Connect` message to the tracker.
117    pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
118        self.0.send_lossy(Message::Connect { public_key, peer });
119    }
120
121    /// Request dialable peers from the tracker.
122    ///
123    /// Returns an empty response if the tracker is shut down.
124    pub async fn dialable(&mut self) -> Dialable<C> {
125        self.0
126            .request_or_default(|responder| Message::Dialable { responder })
127            .await
128    }
129
130    /// Send a `Dial` message to the tracker.
131    ///
132    /// Returns `None` if the tracker is shut down.
133    pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
134        self.0
135            .request(|reservation| Message::Dial {
136                public_key,
137                reservation,
138            })
139            .await
140            .flatten()
141    }
142
143    /// Send an `Acceptable` message to the tracker.
144    ///
145    /// Returns `false` if the tracker is shut down.
146    pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
147        self.0
148            .request_or(
149                |responder| Message::Acceptable {
150                    public_key,
151                    source_ip,
152                    responder,
153                },
154                false,
155            )
156            .await
157    }
158
159    /// Send a `Listen` message to the tracker.
160    ///
161    /// Returns `None` if the tracker is shut down.
162    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
163        self.0
164            .request(|reservation| Message::Listen {
165                public_key,
166                reservation,
167            })
168            .await
169            .flatten()
170    }
171}
172
173/// Allows releasing reservations
174#[derive(Clone, Debug)]
175pub struct Releaser<C: PublicKey> {
176    sender: UnboundedMailbox<Message<C>>,
177}
178
179impl<C: PublicKey> Releaser<C> {
180    /// Create a new releaser.
181    pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
182        Self { sender }
183    }
184
185    /// Release a reservation.
186    pub fn release(&mut self, metadata: Metadata<C>) {
187        self.sender.0.send_lossy(Message::Release { metadata });
188    }
189}
190
191/// Mechanism to register authorized peers.
192///
193/// Peers that are not explicitly authorized
194/// will be blocked by commonware-p2p.
195#[derive(Debug, Clone)]
196pub struct Oracle<C: PublicKey> {
197    sender: UnboundedMailbox<Message<C>>,
198}
199
200impl<C: PublicKey> Oracle<C> {
201    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
202        Self { sender }
203    }
204}
205
206impl<C: PublicKey> crate::Provider for Oracle<C> {
207    type PublicKey = C;
208
209    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
210        self.sender
211            .0
212            .request(|responder| Message::PeerSet {
213                index: id,
214                responder,
215            })
216            .await
217            .flatten()
218    }
219
220    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
221        self.sender
222            .0
223            .request(|responder| Message::Subscribe { responder })
224            .await
225            .unwrap_or_else(|| {
226                let (_, rx) = mpsc::unbounded_channel();
227                rx
228            })
229    }
230}
231
232impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
233    async fn track<R>(&mut self, index: u64, peers: R)
234    where
235        R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
236    {
237        self.sender.0.send_lossy(Message::Register {
238            index,
239            peers: peers.into(),
240        });
241    }
242
243    async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
244        self.sender.0.send_lossy(Message::Overwrite { peers });
245    }
246}
247
248impl<C: PublicKey> crate::Blocker for Oracle<C> {
249    type PublicKey = C;
250
251    async fn block(&mut self, public_key: Self::PublicKey) {
252        self.sender.0.send_lossy(Message::Block { public_key });
253    }
254}