commonware_p2p/authenticated/lookup/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::{
3 authenticated::{
4 lookup::actors::{peer, tracker::Metadata},
5 mailbox::UnboundedMailbox,
6 Mailbox,
7 },
8 types::Address,
9 Ingress, PeerSetSubscription,
10};
11use commonware_cryptography::PublicKey;
12use commonware_utils::{
13 channel::{fallible::FallibleExt, mpsc, oneshot},
14 ordered::{Map, Set},
15};
16use std::net::IpAddr;
17
18/// Messages that can be sent to the tracker actor.
19#[derive(Debug)]
20pub enum Message<C: PublicKey> {
21 // ---------- Used by oracle ----------
22 /// Register a peer set at a given index.
23 Register { index: u64, peers: Map<C, Address> },
24
25 /// Update addresses for multiple peers without creating a new peer set.
26 Overwrite { peers: Map<C, Address> },
27
28 // ---------- Used by peer set provider ----------
29 /// Fetch the peer set at a given index.
30 PeerSet {
31 /// The index of the peer set to fetch.
32 index: u64,
33 /// One-shot channel to send the peer set.
34 responder: oneshot::Sender<Option<Set<C>>>,
35 },
36 /// Subscribe to notifications when new peer sets are added.
37 Subscribe {
38 /// One-shot channel to send the subscription receiver.
39 responder: oneshot::Sender<PeerSetSubscription<C>>,
40 },
41
42 // ---------- Used by blocker ----------
43 /// Block a peer, disconnecting them if currently connected and preventing future connections
44 /// for as long as the peer remains in at least one active peer set.
45 Block { public_key: C },
46
47 // ---------- Used by peer ----------
48 /// Notify the tracker that a peer has been successfully connected.
49 Connect {
50 /// The public key of the peer.
51 public_key: C,
52
53 /// The mailbox of the peer actor.
54 peer: Mailbox<peer::Message>,
55 },
56
57 // ---------- Used by dialer ----------
58 /// Request a list of dialable peers.
59 Dialable {
60 /// One-shot channel to send the list of dialable peers.
61 responder: oneshot::Sender<Vec<C>>,
62 },
63
64 /// Request a reservation for a particular peer to dial.
65 ///
66 /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
67 /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
68 /// blocked or already has an active reservation).
69 Dial {
70 /// The public key of the peer to reserve.
71 public_key: C,
72
73 /// Sender to respond with the reservation and ingress address.
74 reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
75 },
76
77 // ---------- Used by listener ----------
78 /// Check if a peer is acceptable (can accept an incoming connection from them).
79 Acceptable {
80 /// The public key of the peer to check.
81 public_key: C,
82
83 /// The IP address the peer connected from.
84 source_ip: IpAddr,
85
86 /// The sender to respond with whether the peer is acceptable.
87 responder: oneshot::Sender<bool>,
88 },
89
90 /// Request a reservation for a particular peer.
91 ///
92 /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if the
93 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
94 /// has an active reservation).
95 Listen {
96 /// The public key of the peer to reserve.
97 public_key: C,
98
99 /// The sender to respond with the reservation.
100 reservation: oneshot::Sender<Option<Reservation<C>>>,
101 },
102
103 // ---------- Used by reservation ----------
104 /// Release a reservation.
105 Release {
106 /// The metadata of the reservation to release.
107 metadata: Metadata<C>,
108 },
109}
110
111impl<C: PublicKey> UnboundedMailbox<Message<C>> {
112 /// Send a `Connect` message to the tracker.
113 pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
114 self.0.send_lossy(Message::Connect { public_key, peer });
115 }
116
117 /// Request a list of dialable peers from the tracker.
118 ///
119 /// Returns an empty list if the tracker is shut down.
120 pub async fn dialable(&mut self) -> Vec<C> {
121 self.0
122 .request_or_default(|responder| Message::Dialable { responder })
123 .await
124 }
125
126 /// Send a `Dial` message to the tracker.
127 ///
128 /// Returns `None` if the tracker is shut down.
129 pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
130 self.0
131 .request(|reservation| Message::Dial {
132 public_key,
133 reservation,
134 })
135 .await
136 .flatten()
137 }
138
139 /// Send an `Acceptable` message to the tracker.
140 ///
141 /// Returns `false` if the tracker is shut down.
142 pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
143 self.0
144 .request_or(
145 |responder| Message::Acceptable {
146 public_key,
147 source_ip,
148 responder,
149 },
150 false,
151 )
152 .await
153 }
154
155 /// Send a `Listen` message to the tracker.
156 ///
157 /// Returns `None` if the tracker is shut down.
158 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
159 self.0
160 .request(|reservation| Message::Listen {
161 public_key,
162 reservation,
163 })
164 .await
165 .flatten()
166 }
167}
168
169/// Allows releasing reservations
170#[derive(Clone, Debug)]
171pub struct Releaser<C: PublicKey> {
172 sender: UnboundedMailbox<Message<C>>,
173}
174
175impl<C: PublicKey> Releaser<C> {
176 /// Create a new releaser.
177 pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
178 Self { sender }
179 }
180
181 /// Release a reservation.
182 pub fn release(&mut self, metadata: Metadata<C>) {
183 self.sender.0.send_lossy(Message::Release { metadata });
184 }
185}
186
187/// Mechanism to register authorized peers.
188///
189/// Peers that are not explicitly authorized
190/// will be blocked by commonware-p2p.
191#[derive(Debug, Clone)]
192pub struct Oracle<C: PublicKey> {
193 sender: UnboundedMailbox<Message<C>>,
194}
195
196impl<C: PublicKey> Oracle<C> {
197 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
198 Self { sender }
199 }
200}
201
202impl<C: PublicKey> crate::Provider for Oracle<C> {
203 type PublicKey = C;
204
205 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
206 self.sender
207 .0
208 .request(|responder| Message::PeerSet {
209 index: id,
210 responder,
211 })
212 .await
213 .flatten()
214 }
215
216 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
217 self.sender
218 .0
219 .request(|responder| Message::Subscribe { responder })
220 .await
221 .unwrap_or_else(|| {
222 let (_, rx) = mpsc::unbounded_channel();
223 rx
224 })
225 }
226}
227
228impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
229 async fn track(&mut self, index: u64, peers: Map<Self::PublicKey, Address>) {
230 self.sender.0.send_lossy(Message::Register { index, peers });
231 }
232
233 async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
234 self.sender.0.send_lossy(Message::Overwrite { peers });
235 }
236}
237
238impl<C: PublicKey> crate::Blocker for Oracle<C> {
239 type PublicKey = C;
240
241 async fn block(&mut self, public_key: Self::PublicKey) {
242 self.sender.0.send_lossy(Message::Block { public_key });
243 }
244}