1use super::{Error, Receiver, Sender};
2use crate::{
3 authenticated::UnboundedMailbox, Address, AddressableTrackedPeers, Channel,
4 PeerSetSubscription, TrackedPeers,
5};
6use commonware_cryptography::PublicKey;
7use commonware_runtime::{Clock, Quota};
8use commonware_utils::{
9 channel::{fallible::FallibleExt, mpsc, oneshot, ring},
10 ordered::Map,
11};
12use rand_distr::Normal;
13use std::time::Duration;
14
15pub enum Message<P: PublicKey, E: Clock> {
16 Register {
17 channel: Channel,
18 public_key: P,
19 quota: Quota,
20 #[allow(clippy::type_complexity)]
21 result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
22 },
23 Track {
24 id: u64,
25 peers: TrackedPeers<P>,
26 },
27 PeerSet {
28 id: u64,
29 response: oneshot::Sender<Option<TrackedPeers<P>>>,
30 },
31 Subscribe {
32 response: oneshot::Sender<PeerSetSubscription<P>>,
33 },
34 SubscribeConnected {
35 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
36 },
37 LimitBandwidth {
38 public_key: P,
39 egress_cap: Option<usize>,
40 ingress_cap: Option<usize>,
41 result: oneshot::Sender<()>,
42 },
43 AddLink {
44 sender: P,
45 receiver: P,
46 sampler: Normal<f64>,
47 success_rate: f64,
48 result: oneshot::Sender<Result<(), Error>>,
49 },
50 RemoveLink {
51 sender: P,
52 receiver: P,
53 result: oneshot::Sender<Result<(), Error>>,
54 },
55 Block {
56 from: P,
58 to: P,
60 },
61 Blocked {
62 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
63 },
64}
65
66impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
70 Self::Track { id, .. } => f
71 .debug_struct("Track")
72 .field("id", id)
73 .finish_non_exhaustive(),
74 Self::PeerSet { id, .. } => f
75 .debug_struct("PeerSet")
76 .field("id", id)
77 .finish_non_exhaustive(),
78 Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
79 Self::SubscribeConnected { .. } => {
80 f.debug_struct("SubscribeConnected").finish_non_exhaustive()
81 }
82 Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
83 Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
84 Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
85 Self::Block { from, to } => f
86 .debug_struct("Block")
87 .field("from", from)
88 .field("to", to)
89 .finish(),
90 Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
91 }
92 }
93}
94
95#[derive(Clone)]
100pub struct Link {
101 pub latency: Duration,
103
104 pub jitter: Duration,
106
107 pub success_rate: f64,
109}
110
111#[derive(Debug)]
116pub struct Oracle<P: PublicKey, E: Clock> {
117 sender: UnboundedMailbox<Message<P, E>>,
118}
119
120impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
121 fn clone(&self) -> Self {
122 Self {
123 sender: self.sender.clone(),
124 }
125 }
126}
127
128impl<P: PublicKey, E: Clock> Oracle<P, E> {
129 pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
131 Self { sender }
132 }
133
134 pub fn control(&self, me: P) -> Control<P, E> {
136 Control {
137 me,
138 sender: self.sender.clone(),
139 }
140 }
141
142 pub fn manager(&self) -> Manager<P, E> {
146 Manager {
147 oracle: self.clone(),
148 }
149 }
150
151 pub fn socket_manager(&self) -> SocketManager<P, E> {
155 SocketManager {
156 oracle: self.clone(),
157 }
158 }
159
160 pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
162 self.sender
163 .0
164 .request(|result| Message::Blocked { result })
165 .await
166 .ok_or(Error::NetworkClosed)?
167 }
168
169 pub async fn limit_bandwidth(
176 &self,
177 public_key: P,
178 egress_cap: Option<usize>,
179 ingress_cap: Option<usize>,
180 ) -> Result<(), Error> {
181 self.sender
182 .0
183 .request(|result| Message::LimitBandwidth {
184 public_key,
185 egress_cap,
186 ingress_cap,
187 result,
188 })
189 .await
190 .ok_or(Error::NetworkClosed)
191 }
192
193 pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
200 if sender == receiver {
202 return Err(Error::LinkingSelf);
203 }
204 if config.success_rate < 0.0 || config.success_rate > 1.0 {
205 return Err(Error::InvalidSuccessRate(config.success_rate));
206 }
207
208 let latency_ms = config.latency.as_secs_f64() * 1000.0;
210 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
211
212 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
214
215 self.sender
216 .0
217 .request(|result| Message::AddLink {
218 sender,
219 receiver,
220 sampler,
221 success_rate: config.success_rate,
222 result,
223 })
224 .await
225 .ok_or(Error::NetworkClosed)?
226 }
227
228 pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
232 if sender == receiver {
234 return Err(Error::LinkingSelf);
235 }
236
237 self.sender
238 .0
239 .request(|result| Message::RemoveLink {
240 sender,
241 receiver,
242 result,
243 })
244 .await
245 .ok_or(Error::NetworkClosed)?
246 }
247
248 fn track(&self, id: u64, peers: TrackedPeers<P>) {
250 self.sender.0.send_lossy(Message::Track { id, peers });
251 }
252
253 async fn peer_set(&self, id: u64) -> Option<TrackedPeers<P>> {
255 self.sender
256 .0
257 .request(|response| Message::PeerSet { id, response })
258 .await
259 .flatten()
260 }
261
262 async fn subscribe(&self) -> PeerSetSubscription<P> {
264 self.sender
265 .0
266 .request(|response| Message::Subscribe { response })
267 .await
268 .unwrap_or_else(|| {
269 let (_, rx) = mpsc::unbounded_channel();
270 rx
271 })
272 }
273}
274
275pub struct Manager<P: PublicKey, E: Clock> {
279 oracle: Oracle<P, E>,
281}
282
283impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285 f.debug_struct("Manager").finish_non_exhaustive()
286 }
287}
288
289impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
290 fn clone(&self) -> Self {
291 Self {
292 oracle: self.oracle.clone(),
293 }
294 }
295}
296
297impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
298 type PublicKey = P;
299
300 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
301 self.oracle.peer_set(id).await
302 }
303
304 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
305 self.oracle.subscribe().await
306 }
307}
308
309impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
310 async fn track<R>(&mut self, id: u64, peers: R)
311 where
312 R: Into<TrackedPeers<Self::PublicKey>> + Send,
313 {
314 self.oracle.track(id, peers.into());
315 }
316}
317
318pub struct SocketManager<P: PublicKey, E: Clock> {
328 oracle: Oracle<P, E>,
330}
331
332impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 f.debug_struct("SocketManager").finish_non_exhaustive()
335 }
336}
337
338impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
339 fn clone(&self) -> Self {
340 Self {
341 oracle: self.oracle.clone(),
342 }
343 }
344}
345
346impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
347 type PublicKey = P;
348
349 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
350 self.oracle.peer_set(id).await
351 }
352
353 async fn subscribe(&mut self) -> PeerSetSubscription<P> {
354 self.oracle.subscribe().await
355 }
356}
357
358impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
359 async fn track<R>(&mut self, id: u64, peers: R)
360 where
361 R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
362 {
363 let peers = peers.into();
365 self.oracle.track(
366 id,
367 TrackedPeers::new(peers.primary.into_keys(), peers.secondary.into_keys()),
368 );
369 }
370
371 async fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) {
372 }
374}
375
376#[derive(Debug)]
378pub struct Control<P: PublicKey, E: Clock> {
379 me: P,
381
382 sender: UnboundedMailbox<Message<P, E>>,
384}
385
386impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
387 fn clone(&self) -> Self {
388 Self {
389 me: self.me.clone(),
390 sender: self.sender.clone(),
391 }
392 }
393}
394
395impl<P: PublicKey, E: Clock> Control<P, E> {
396 pub async fn register(
403 &self,
404 channel: Channel,
405 quota: Quota,
406 ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
407 let public_key = self.me.clone();
408 self.sender
409 .0
410 .request(|result| Message::Register {
411 channel,
412 public_key,
413 quota,
414 result,
415 })
416 .await
417 .ok_or(Error::NetworkClosed)?
418 }
419}
420
421impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
422 type PublicKey = P;
423
424 async fn block(&mut self, public_key: P) {
425 self.sender.0.send_lossy(Message::Block {
426 from: self.me.clone(),
427 to: public_key,
428 });
429 }
430}