commonware_p2p/authenticated/lookup/actors/tracker/ingress.rs
1use super::Reservation;
2use crate::authenticated::{
3 lookup::actors::{peer, tracker::Metadata},
4 Mailbox,
5};
6use commonware_cryptography::PublicKey;
7use commonware_runtime::{Metrics, Spawner};
8use futures::{
9 channel::{mpsc, oneshot},
10 SinkExt,
11};
12use std::net::SocketAddr;
13
14/// Messages that can be sent to the tracker actor.
15pub enum Message<E: Spawner + Metrics, C: PublicKey> {
16 // ---------- Used by oracle ----------
17 /// Register a peer set at a given index.
18 Register {
19 index: u64,
20 peers: Vec<(C, SocketAddr)>,
21 },
22
23 // ---------- Used by blocker ----------
24 /// Block a peer, disconnecting them if currently connected and preventing future connections
25 /// for as long as the peer remains in at least one active peer set.
26 Block { public_key: C },
27
28 // ---------- Used by peer ----------
29 /// Notify the tracker that a peer has been successfully connected.
30 Connect {
31 /// The public key of the peer.
32 public_key: C,
33
34 /// The mailbox of the peer actor.
35 peer: Mailbox<peer::Message>,
36 },
37
38 // ---------- Used by dialer ----------
39 /// Request a list of dialable peers.
40 Dialable {
41 /// One-shot channel to send the list of dialable peers.
42 responder: oneshot::Sender<Vec<C>>,
43 },
44
45 /// Request a reservation for a particular peer to dial.
46 ///
47 /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if the
48 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
49 /// has an active reservation).
50 Dial {
51 /// The public key of the peer to reserve.
52 public_key: C,
53
54 /// sender to respond with the reservation.
55 reservation: oneshot::Sender<Option<Reservation<E, C>>>,
56 },
57
58 // ---------- Used by listener ----------
59 /// Check if we should listen to a peer.
60 Listenable {
61 /// The public key of the peer to check.
62 public_key: C,
63
64 /// The sender to respond with the listenable status.
65 responder: oneshot::Sender<bool>,
66 },
67
68 /// Request a reservation for a particular peer.
69 ///
70 /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if the
71 /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
72 /// has an active reservation).
73 Listen {
74 /// The public key of the peer to reserve.
75 public_key: C,
76
77 /// The sender to respond with the reservation.
78 reservation: oneshot::Sender<Option<Reservation<E, C>>>,
79 },
80
81 // ---------- Used by reservation ----------
82 /// Release a reservation.
83 Release {
84 /// The metadata of the reservation to release.
85 metadata: Metadata<C>,
86 },
87}
88
89impl<E: Spawner + Metrics, C: PublicKey> Mailbox<Message<E, C>> {
90 /// Send a `Connect` message to the tracker.
91 pub async fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
92 self.send(Message::Connect { public_key, peer })
93 .await
94 .unwrap();
95 }
96
97 /// Send a `Block` message to the tracker.
98 pub async fn dialable(&mut self) -> Vec<C> {
99 let (sender, receiver) = oneshot::channel();
100 self.send(Message::Dialable { responder: sender })
101 .await
102 .unwrap();
103 receiver.await.unwrap()
104 }
105
106 /// Send a `Dial` message to the tracker.
107 pub async fn dial(&mut self, public_key: C) -> Option<Reservation<E, C>> {
108 let (tx, rx) = oneshot::channel();
109 self.send(Message::Dial {
110 public_key,
111 reservation: tx,
112 })
113 .await
114 .unwrap();
115 rx.await.unwrap()
116 }
117
118 /// Send a `Listenable` message to the tracker.
119 pub async fn listenable(&mut self, public_key: C) -> bool {
120 let (tx, rx) = oneshot::channel();
121 self.send(Message::Listenable {
122 public_key,
123 responder: tx,
124 })
125 .await
126 .unwrap();
127 rx.await.unwrap()
128 }
129
130 /// Send a `Listen` message to the tracker.
131 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<E, C>> {
132 let (tx, rx) = oneshot::channel();
133 self.send(Message::Listen {
134 public_key,
135 reservation: tx,
136 })
137 .await
138 .unwrap();
139 rx.await.unwrap()
140 }
141}
142
143/// Allows releasing reservations
144#[derive(Clone)]
145pub struct Releaser<E: Spawner + Metrics, C: PublicKey> {
146 sender: mpsc::Sender<Message<E, C>>,
147}
148
149impl<E: Spawner + Metrics, C: PublicKey> Releaser<E, C> {
150 /// Create a new releaser.
151 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
152 Self { sender }
153 }
154
155 /// Try to release a reservation.
156 ///
157 /// Returns `true` if the reservation was released, `false` if the mailbox is full.
158 pub fn try_release(&mut self, metadata: Metadata<C>) -> bool {
159 let Err(e) = self.sender.try_send(Message::Release { metadata }) else {
160 return true;
161 };
162 assert!(
163 e.is_full(),
164 "Unexpected error trying to release reservation {e:?}"
165 );
166 false
167 }
168
169 /// Release a reservation.
170 ///
171 /// This method will block if the mailbox is full.
172 pub async fn release(&mut self, metadata: Metadata<C>) {
173 self.sender
174 .send(Message::Release { metadata })
175 .await
176 .unwrap();
177 }
178}
179
180/// Mechanism to register authorized peers.
181///
182/// Peers that are not explicitly authorized
183/// will be blocked by commonware-p2p.
184#[derive(Clone)]
185pub struct Oracle<E: Spawner + Metrics, C: PublicKey> {
186 sender: mpsc::Sender<Message<E, C>>,
187}
188
189impl<E: Spawner + Metrics, C: PublicKey> Oracle<E, C> {
190 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
191 Self { sender }
192 }
193
194 /// Register a set of authorized peers at a given index.
195 ///
196 /// # Parameters
197 ///
198 /// * `index` - Index of the set of authorized peers (like a blockchain height).
199 /// Should be monotonically increasing.
200 /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
201 /// Each element is a tuple containing the public key and the socket address of the peer.
202 pub async fn register(&mut self, index: u64, peers: Vec<(C, SocketAddr)>) {
203 let _ = self.sender.send(Message::Register { index, peers }).await;
204 }
205}
206
207impl<E: Spawner + Metrics, C: PublicKey> crate::Blocker for Oracle<E, C> {
208 type PublicKey = C;
209
210 async fn block(&mut self, public_key: Self::PublicKey) {
211 let _ = self.sender.send(Message::Block { public_key }).await;
212 }
213}