1use super::{Error, Receiver, Sender};
2use crate::{authenticated::UnboundedMailbox, Address, Channel, PeerSetSubscription};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Clock, Quota};
5use commonware_utils::{
6 channel::{fallible::FallibleExt, mpsc, oneshot, ring},
7 ordered::{Map, Set},
8};
9use rand_distr::Normal;
10use std::time::Duration;
11
12pub enum Message<P: PublicKey, E: Clock> {
13 Register {
14 channel: Channel,
15 public_key: P,
16 quota: Quota,
17 #[allow(clippy::type_complexity)]
18 result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
19 },
20 Track {
21 id: u64,
22 peers: Set<P>,
23 },
24 PeerSet {
25 id: u64,
26 response: oneshot::Sender<Option<Set<P>>>,
27 },
28 Subscribe {
29 response: oneshot::Sender<PeerSetSubscription<P>>,
30 },
31 SubscribeConnected {
32 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
33 },
34 LimitBandwidth {
35 public_key: P,
36 egress_cap: Option<usize>,
37 ingress_cap: Option<usize>,
38 result: oneshot::Sender<()>,
39 },
40 AddLink {
41 sender: P,
42 receiver: P,
43 sampler: Normal<f64>,
44 success_rate: f64,
45 result: oneshot::Sender<Result<(), Error>>,
46 },
47 RemoveLink {
48 sender: P,
49 receiver: P,
50 result: oneshot::Sender<Result<(), Error>>,
51 },
52 Block {
53 from: P,
55 to: P,
57 },
58 Blocked {
59 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
60 },
61}
62
63impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
67 Self::Track { id, .. } => f
68 .debug_struct("Track")
69 .field("id", id)
70 .finish_non_exhaustive(),
71 Self::PeerSet { id, .. } => f
72 .debug_struct("PeerSet")
73 .field("id", id)
74 .finish_non_exhaustive(),
75 Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
76 Self::SubscribeConnected { .. } => {
77 f.debug_struct("SubscribeConnected").finish_non_exhaustive()
78 }
79 Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
80 Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
81 Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
82 Self::Block { from, to } => f
83 .debug_struct("Block")
84 .field("from", from)
85 .field("to", to)
86 .finish(),
87 Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
88 }
89 }
90}
91
92#[derive(Clone)]
97pub struct Link {
98 pub latency: Duration,
100
101 pub jitter: Duration,
103
104 pub success_rate: f64,
106}
107
108#[derive(Debug)]
113pub struct Oracle<P: PublicKey, E: Clock> {
114 sender: UnboundedMailbox<Message<P, E>>,
115}
116
117impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
118 fn clone(&self) -> Self {
119 Self {
120 sender: self.sender.clone(),
121 }
122 }
123}
124
125impl<P: PublicKey, E: Clock> Oracle<P, E> {
126 pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
128 Self { sender }
129 }
130
131 pub fn control(&self, me: P) -> Control<P, E> {
133 Control {
134 me,
135 sender: self.sender.clone(),
136 }
137 }
138
139 pub fn manager(&self) -> Manager<P, E> {
143 Manager {
144 oracle: self.clone(),
145 }
146 }
147
148 pub fn socket_manager(&self) -> SocketManager<P, E> {
152 SocketManager {
153 oracle: self.clone(),
154 }
155 }
156
157 pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
159 self.sender
160 .0
161 .request(|result| Message::Blocked { result })
162 .await
163 .ok_or(Error::NetworkClosed)?
164 }
165
166 pub async fn limit_bandwidth(
173 &self,
174 public_key: P,
175 egress_cap: Option<usize>,
176 ingress_cap: Option<usize>,
177 ) -> Result<(), Error> {
178 self.sender
179 .0
180 .request(|result| Message::LimitBandwidth {
181 public_key,
182 egress_cap,
183 ingress_cap,
184 result,
185 })
186 .await
187 .ok_or(Error::NetworkClosed)
188 }
189
190 pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
197 if sender == receiver {
199 return Err(Error::LinkingSelf);
200 }
201 if config.success_rate < 0.0 || config.success_rate > 1.0 {
202 return Err(Error::InvalidSuccessRate(config.success_rate));
203 }
204
205 let latency_ms = config.latency.as_secs_f64() * 1000.0;
207 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
208
209 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
211
212 self.sender
213 .0
214 .request(|result| Message::AddLink {
215 sender,
216 receiver,
217 sampler,
218 success_rate: config.success_rate,
219 result,
220 })
221 .await
222 .ok_or(Error::NetworkClosed)?
223 }
224
225 pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
229 if sender == receiver {
231 return Err(Error::LinkingSelf);
232 }
233
234 self.sender
235 .0
236 .request(|result| Message::RemoveLink {
237 sender,
238 receiver,
239 result,
240 })
241 .await
242 .ok_or(Error::NetworkClosed)?
243 }
244
245 fn track(&self, id: u64, peers: Set<P>) {
247 self.sender.0.send_lossy(Message::Track { id, peers });
248 }
249
250 async fn peer_set(&self, id: u64) -> Option<Set<P>> {
252 self.sender
253 .0
254 .request(|response| Message::PeerSet { id, response })
255 .await
256 .flatten()
257 }
258
259 async fn subscribe(&self) -> PeerSetSubscription<P> {
261 self.sender
262 .0
263 .request(|response| Message::Subscribe { response })
264 .await
265 .unwrap_or_else(|| {
266 let (_, rx) = mpsc::unbounded_channel();
267 rx
268 })
269 }
270}
271
272pub struct Manager<P: PublicKey, E: Clock> {
276 oracle: Oracle<P, E>,
278}
279
280impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 f.debug_struct("Manager").finish_non_exhaustive()
283 }
284}
285
286impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
287 fn clone(&self) -> Self {
288 Self {
289 oracle: self.oracle.clone(),
290 }
291 }
292}
293
294impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
295 type PublicKey = P;
296
297 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
298 self.oracle.peer_set(id).await
299 }
300
301 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
302 self.oracle.subscribe().await
303 }
304}
305
306impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
307 async fn track(&mut self, id: u64, peers: Set<Self::PublicKey>) {
308 self.oracle.track(id, peers);
309 }
310}
311
312pub struct SocketManager<P: PublicKey, E: Clock> {
322 oracle: Oracle<P, E>,
324}
325
326impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
327 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328 f.debug_struct("SocketManager").finish_non_exhaustive()
329 }
330}
331
332impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
333 fn clone(&self) -> Self {
334 Self {
335 oracle: self.oracle.clone(),
336 }
337 }
338}
339
340impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
341 type PublicKey = P;
342
343 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
344 self.oracle.peer_set(id).await
345 }
346
347 async fn subscribe(&mut self) -> PeerSetSubscription<P> {
348 self.oracle.subscribe().await
349 }
350}
351
352impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
353 async fn track(&mut self, id: u64, peers: Map<Self::PublicKey, Address>) {
354 self.oracle.track(id, peers.into_keys());
356 }
357
358 async fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) {
359 }
361}
362
363#[derive(Debug)]
365pub struct Control<P: PublicKey, E: Clock> {
366 me: P,
368
369 sender: UnboundedMailbox<Message<P, E>>,
371}
372
373impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
374 fn clone(&self) -> Self {
375 Self {
376 me: self.me.clone(),
377 sender: self.sender.clone(),
378 }
379 }
380}
381
382impl<P: PublicKey, E: Clock> Control<P, E> {
383 pub async fn register(
390 &self,
391 channel: Channel,
392 quota: Quota,
393 ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
394 let public_key = self.me.clone();
395 self.sender
396 .0
397 .request(|result| Message::Register {
398 channel,
399 public_key,
400 quota,
401 result,
402 })
403 .await
404 .ok_or(Error::NetworkClosed)?
405 }
406}
407
408impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
409 type PublicKey = P;
410
411 async fn block(&mut self, public_key: P) {
412 self.sender.0.send_lossy(Message::Block {
413 from: self.me.clone(),
414 to: public_key,
415 });
416 }
417}