1use super::{Error, Receiver, Sender};
2use crate::{authenticated::UnboundedMailbox, Address, Channel};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Clock, Quota};
5use commonware_utils::{
6 channels::{fallible::FallibleExt, ring},
7 ordered::{Map, Set},
8};
9use futures::channel::{mpsc, oneshot};
10use rand_distr::Normal;
11use std::time::Duration;
12
13pub enum Message<P: PublicKey, E: Clock> {
14 Register {
15 channel: Channel,
16 public_key: P,
17 quota: Quota,
18 #[allow(clippy::type_complexity)]
19 result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
20 },
21 Update {
22 id: u64,
23 peers: Set<P>,
24 },
25 PeerSet {
26 id: u64,
27 response: oneshot::Sender<Option<Set<P>>>,
28 },
29 Subscribe {
30 sender: mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>,
31 },
32 SubscribeConnected {
33 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
34 },
35 LimitBandwidth {
36 public_key: P,
37 egress_cap: Option<usize>,
38 ingress_cap: Option<usize>,
39 result: oneshot::Sender<()>,
40 },
41 AddLink {
42 sender: P,
43 receiver: P,
44 sampler: Normal<f64>,
45 success_rate: f64,
46 result: oneshot::Sender<Result<(), Error>>,
47 },
48 RemoveLink {
49 sender: P,
50 receiver: P,
51 result: oneshot::Sender<Result<(), Error>>,
52 },
53 Block {
54 from: P,
56 to: P,
58 },
59 Blocked {
60 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
61 },
62}
63
64impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 match self {
67 Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
68 Self::Update { id, .. } => f
69 .debug_struct("Update")
70 .field("id", id)
71 .finish_non_exhaustive(),
72 Self::PeerSet { id, .. } => f
73 .debug_struct("PeerSet")
74 .field("id", id)
75 .finish_non_exhaustive(),
76 Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
77 Self::SubscribeConnected { .. } => {
78 f.debug_struct("SubscribeConnected").finish_non_exhaustive()
79 }
80 Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
81 Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
82 Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
83 Self::Block { from, to } => f
84 .debug_struct("Block")
85 .field("from", from)
86 .field("to", to)
87 .finish(),
88 Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
89 }
90 }
91}
92
93#[derive(Clone)]
98pub struct Link {
99 pub latency: Duration,
101
102 pub jitter: Duration,
104
105 pub success_rate: f64,
107}
108
109#[derive(Debug)]
114pub struct Oracle<P: PublicKey, E: Clock> {
115 sender: UnboundedMailbox<Message<P, E>>,
116}
117
118impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
119 fn clone(&self) -> Self {
120 Self {
121 sender: self.sender.clone(),
122 }
123 }
124}
125
126impl<P: PublicKey, E: Clock> Oracle<P, E> {
127 pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
129 Self { sender }
130 }
131
132 pub fn control(&self, me: P) -> Control<P, E> {
134 Control {
135 me,
136 sender: self.sender.clone(),
137 }
138 }
139
140 pub fn manager(&self) -> Manager<P, E> {
144 Manager {
145 oracle: self.clone(),
146 }
147 }
148
149 pub fn socket_manager(&self) -> SocketManager<P, E> {
153 SocketManager {
154 oracle: self.clone(),
155 }
156 }
157
158 pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
160 self.sender
161 .0
162 .request(|result| Message::Blocked { result })
163 .await
164 .ok_or(Error::NetworkClosed)?
165 }
166
167 pub async fn limit_bandwidth(
174 &self,
175 public_key: P,
176 egress_cap: Option<usize>,
177 ingress_cap: Option<usize>,
178 ) -> Result<(), Error> {
179 self.sender
180 .0
181 .request(|result| Message::LimitBandwidth {
182 public_key,
183 egress_cap,
184 ingress_cap,
185 result,
186 })
187 .await
188 .ok_or(Error::NetworkClosed)
189 }
190
191 pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
198 if sender == receiver {
200 return Err(Error::LinkingSelf);
201 }
202 if config.success_rate < 0.0 || config.success_rate > 1.0 {
203 return Err(Error::InvalidSuccessRate(config.success_rate));
204 }
205
206 let latency_ms = config.latency.as_secs_f64() * 1000.0;
208 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
209
210 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
212
213 self.sender
214 .0
215 .request(|result| Message::AddLink {
216 sender,
217 receiver,
218 sampler,
219 success_rate: config.success_rate,
220 result,
221 })
222 .await
223 .ok_or(Error::NetworkClosed)?
224 }
225
226 pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
230 if sender == receiver {
232 return Err(Error::LinkingSelf);
233 }
234
235 self.sender
236 .0
237 .request(|result| Message::RemoveLink {
238 sender,
239 receiver,
240 result,
241 })
242 .await
243 .ok_or(Error::NetworkClosed)?
244 }
245
246 async fn update(&self, id: u64, peers: Set<P>) {
248 self.sender.0.send_lossy(Message::Update { id, peers });
249 }
250
251 async fn peer_set(&self, id: u64) -> Option<Set<P>> {
253 self.sender
254 .0
255 .request(|response| Message::PeerSet { id, response })
256 .await
257 .flatten()
258 }
259
260 async fn subscribe(&self) -> mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)> {
262 let (sender, receiver) = mpsc::unbounded();
263 self.sender.0.send_lossy(Message::Subscribe { sender });
264 receiver
265 }
266}
267
268pub struct Manager<P: PublicKey, E: Clock> {
272 oracle: Oracle<P, E>,
274}
275
276impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 f.debug_struct("Manager").finish_non_exhaustive()
279 }
280}
281
282impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
283 fn clone(&self) -> Self {
284 Self {
285 oracle: self.oracle.clone(),
286 }
287 }
288}
289
290impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
291 type PublicKey = P;
292 type Peers = Set<Self::PublicKey>;
293
294 async fn update(&mut self, id: u64, peers: Self::Peers) {
295 self.oracle.update(id, peers).await;
296 }
297
298 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
299 self.oracle.peer_set(id).await
300 }
301
302 async fn subscribe(
303 &mut self,
304 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
305 self.oracle.subscribe().await
306 }
307}
308
309pub struct SocketManager<P: PublicKey, E: Clock> {
319 oracle: Oracle<P, E>,
321}
322
323impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325 f.debug_struct("SocketManager").finish_non_exhaustive()
326 }
327}
328
329impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
330 fn clone(&self) -> Self {
331 Self {
332 oracle: self.oracle.clone(),
333 }
334 }
335}
336
337impl<P: PublicKey, E: Clock> crate::Manager for SocketManager<P, E> {
338 type PublicKey = P;
339 type Peers = Map<Self::PublicKey, Address>;
340
341 async fn update(&mut self, id: u64, peers: Self::Peers) {
342 self.oracle.update(id, peers.into_keys()).await;
344 }
345
346 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
347 self.oracle.peer_set(id).await
348 }
349
350 async fn subscribe(
351 &mut self,
352 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
353 self.oracle.subscribe().await
354 }
355}
356
357#[derive(Debug)]
359pub struct Control<P: PublicKey, E: Clock> {
360 me: P,
362
363 sender: UnboundedMailbox<Message<P, E>>,
365}
366
367impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
368 fn clone(&self) -> Self {
369 Self {
370 me: self.me.clone(),
371 sender: self.sender.clone(),
372 }
373 }
374}
375
376impl<P: PublicKey, E: Clock> Control<P, E> {
377 pub async fn register(
384 &self,
385 channel: Channel,
386 quota: Quota,
387 ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
388 let public_key = self.me.clone();
389 self.sender
390 .0
391 .request(|result| Message::Register {
392 channel,
393 public_key,
394 quota,
395 result,
396 })
397 .await
398 .ok_or(Error::NetworkClosed)?
399 }
400}
401
402impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
403 type PublicKey = P;
404
405 async fn block(&mut self, public_key: P) {
406 self.sender.0.send_lossy(Message::Block {
407 from: self.me.clone(),
408 to: public_key,
409 });
410 }
411}