commonware_p2p/authenticated/lookup/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::{
3 authenticated::{
4 lookup::actors::{peer, tracker::Metadata},
5 mailbox::UnboundedMailbox,
6 Mailbox,
7 },
8 types::Address,
9 Ingress,
10};
11use commonware_cryptography::PublicKey;
12use commonware_utils::{
13 channels::fallible::FallibleExt,
14 ordered::{Map, Set},
15};
16use futures::channel::{mpsc, oneshot};
17use std::net::IpAddr;
18
19/// Messages that can be sent to the tracker actor.
20#[derive(Debug)]
21pub enum Message<C: PublicKey> {
22 // ---------- Used by oracle ----------
23 /// Register a peer set at a given index.
24 Register { index: u64, peers: Map<C, Address> },
25
26 // ---------- Used by peer set provider ----------
27 /// Fetch the peer set at a given index.
28 PeerSet {
29 /// The index of the peer set to fetch.
30 index: u64,
31 /// One-shot channel to send the peer set.
32 responder: oneshot::Sender<Option<Set<C>>>,
33 },
34 /// Subscribe to notifications when new peer sets are added.
35 Subscribe {
36 /// One-shot channel to send the subscription receiver.
37 #[allow(clippy::type_complexity)]
38 responder: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Set<C>, Set<C>)>>,
39 },
40
41 // ---------- Used by blocker ----------
42 /// Block a peer, disconnecting them if currently connected and preventing future connections
43 /// for as long as the peer remains in at least one active peer set.
44 Block { public_key: C },
45
46 // ---------- Used by peer ----------
47 /// Notify the tracker that a peer has been successfully connected.
48 Connect {
49 /// The public key of the peer.
50 public_key: C,
51
52 /// The mailbox of the peer actor.
53 peer: Mailbox<peer::Message>,
54 },
55
56 // ---------- Used by dialer ----------
57 /// Request a list of dialable peers.
58 Dialable {
59 /// One-shot channel to send the list of dialable peers.
60 responder: oneshot::Sender<Vec<C>>,
61 },
62
63 /// Request a reservation for a particular peer to dial.
64 ///
65 /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
66 /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
67 /// blocked or already has an active reservation).
68 Dial {
69 /// The public key of the peer to reserve.
70 public_key: C,
71
72 /// Sender to respond with the reservation and ingress address.
73 reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
74 },
75
76 // ---------- Used by listener ----------
77 /// Check if a peer is acceptable (can accept an incoming connection from them).
78 Acceptable {
79 /// The public key of the peer to check.
80 public_key: C,
81
82 /// The IP address the peer connected from.
83 source_ip: IpAddr,
84
85 /// The sender to respond with whether the peer is acceptable.
86 responder: oneshot::Sender<bool>,
87 },
88
89 /// Request a reservation for a particular peer.
90 ///
91 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
92 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
93 /// has an active reservation).
94 Listen {
95 /// The public key of the peer to reserve.
96 public_key: C,
97
98 /// The sender to respond with the reservation.
99 reservation: oneshot::Sender<Option<Reservation<C>>>,
100 },
101
102 // ---------- Used by reservation ----------
103 /// Release a reservation.
104 Release {
105 /// The metadata of the reservation to release.
106 metadata: Metadata<C>,
107 },
108}
109
110impl<C: PublicKey> UnboundedMailbox<Message<C>> {
111 /// Send a `Connect` message to the tracker.
112 pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
113 self.0.send_lossy(Message::Connect { public_key, peer });
114 }
115
116 /// Request a list of dialable peers from the tracker.
117 ///
118 /// Returns an empty list if the tracker is shut down.
119 pub async fn dialable(&mut self) -> Vec<C> {
120 self.0
121 .request_or_default(|responder| Message::Dialable { responder })
122 .await
123 }
124
125 /// Send a `Dial` message to the tracker.
126 ///
127 /// Returns `None` if the tracker is shut down.
128 pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
129 self.0
130 .request(|reservation| Message::Dial {
131 public_key,
132 reservation,
133 })
134 .await
135 .flatten()
136 }
137
138 /// Send an `Acceptable` message to the tracker.
139 ///
140 /// Returns `false` if the tracker is shut down.
141 pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
142 self.0
143 .request_or(
144 |responder| Message::Acceptable {
145 public_key,
146 source_ip,
147 responder,
148 },
149 false,
150 )
151 .await
152 }
153
154 /// Send a `Listen` message to the tracker.
155 ///
156 /// Returns `None` if the tracker is shut down.
157 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
158 self.0
159 .request(|reservation| Message::Listen {
160 public_key,
161 reservation,
162 })
163 .await
164 .flatten()
165 }
166}
167
168/// Allows releasing reservations
169#[derive(Clone)]
170pub struct Releaser<C: PublicKey> {
171 sender: UnboundedMailbox<Message<C>>,
172}
173
174impl<C: PublicKey> Releaser<C> {
175 /// Create a new releaser.
176 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
177 Self { sender }
178 }
179
180 /// Release a reservation.
181 pub fn release(&mut self, metadata: Metadata<C>) {
182 self.sender.0.send_lossy(Message::Release { metadata });
183 }
184}
185
186/// Mechanism to register authorized peers.
187///
188/// Peers that are not explicitly authorized
189/// will be blocked by commonware-p2p.
190#[derive(Debug, Clone)]
191pub struct Oracle<C: PublicKey> {
192 sender: UnboundedMailbox<Message<C>>,
193}
194
195impl<C: PublicKey> Oracle<C> {
196 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
197 Self { sender }
198 }
199}
200
201impl<C: PublicKey> crate::Manager for Oracle<C> {
202 type PublicKey = C;
203 type Peers = Map<C, Address>;
204
205 /// Register a set of authorized peers at a given index.
206 ///
207 /// # Parameters
208 ///
209 /// * `index` - Index of the set of authorized peers (like a blockchain height).
210 /// Should be monotonically increasing.
211 /// * `peers` - Vector of authorized peers at an `index`.
212 /// Each element contains the public key and address specification of the peer.
213 async fn update(&mut self, index: u64, peers: Self::Peers) {
214 self.sender.0.send_lossy(Message::Register { index, peers });
215 }
216
217 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
218 self.sender
219 .0
220 .request(|responder| Message::PeerSet {
221 index: id,
222 responder,
223 })
224 .await
225 .flatten()
226 }
227
228 async fn subscribe(
229 &mut self,
230 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
231 self.sender
232 .0
233 .request(|responder| Message::Subscribe { responder })
234 .await
235 .unwrap_or_else(|| {
236 let (_, rx) = mpsc::unbounded();
237 rx
238 })
239 }
240}
241
242impl<C: PublicKey> crate::Blocker for Oracle<C> {
243 type PublicKey = C;
244
245 async fn block(&mut self, public_key: Self::PublicKey) {
246 self.sender.0.send_lossy(Message::Block { public_key });
247 }
248}