1use super::{Error, Receiver, Sender};
2use crate::{authenticated::UnboundedMailbox, Address, Channel};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Clock, Quota};
5use commonware_utils::{
6 channel::{fallible::FallibleExt, mpsc, oneshot, ring},
7 ordered::{Map, Set},
8};
9use rand_distr::Normal;
10use std::time::Duration;
11
12pub enum Message<P: PublicKey, E: Clock> {
13 Register {
14 channel: Channel,
15 public_key: P,
16 quota: Quota,
17 #[allow(clippy::type_complexity)]
18 result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
19 },
20 Track {
21 id: u64,
22 peers: Set<P>,
23 },
24 PeerSet {
25 id: u64,
26 response: oneshot::Sender<Option<Set<P>>>,
27 },
28 Subscribe {
29 sender: mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>,
30 },
31 SubscribeConnected {
32 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
33 },
34 LimitBandwidth {
35 public_key: P,
36 egress_cap: Option<usize>,
37 ingress_cap: Option<usize>,
38 result: oneshot::Sender<()>,
39 },
40 AddLink {
41 sender: P,
42 receiver: P,
43 sampler: Normal<f64>,
44 success_rate: f64,
45 result: oneshot::Sender<Result<(), Error>>,
46 },
47 RemoveLink {
48 sender: P,
49 receiver: P,
50 result: oneshot::Sender<Result<(), Error>>,
51 },
52 Block {
53 from: P,
55 to: P,
57 },
58 Blocked {
59 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
60 },
61}
62
63impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
67 Self::Track { id, .. } => f
68 .debug_struct("Track")
69 .field("id", id)
70 .finish_non_exhaustive(),
71 Self::PeerSet { id, .. } => f
72 .debug_struct("PeerSet")
73 .field("id", id)
74 .finish_non_exhaustive(),
75 Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
76 Self::SubscribeConnected { .. } => {
77 f.debug_struct("SubscribeConnected").finish_non_exhaustive()
78 }
79 Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
80 Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
81 Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
82 Self::Block { from, to } => f
83 .debug_struct("Block")
84 .field("from", from)
85 .field("to", to)
86 .finish(),
87 Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
88 }
89 }
90}
91
92#[derive(Clone)]
97pub struct Link {
98 pub latency: Duration,
100
101 pub jitter: Duration,
103
104 pub success_rate: f64,
106}
107
108#[derive(Debug)]
113pub struct Oracle<P: PublicKey, E: Clock> {
114 sender: UnboundedMailbox<Message<P, E>>,
115}
116
117impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
118 fn clone(&self) -> Self {
119 Self {
120 sender: self.sender.clone(),
121 }
122 }
123}
124
125impl<P: PublicKey, E: Clock> Oracle<P, E> {
126 pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
128 Self { sender }
129 }
130
131 pub fn control(&self, me: P) -> Control<P, E> {
133 Control {
134 me,
135 sender: self.sender.clone(),
136 }
137 }
138
139 pub fn manager(&self) -> Manager<P, E> {
143 Manager {
144 oracle: self.clone(),
145 }
146 }
147
148 pub fn socket_manager(&self) -> SocketManager<P, E> {
152 SocketManager {
153 oracle: self.clone(),
154 }
155 }
156
157 pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
159 self.sender
160 .0
161 .request(|result| Message::Blocked { result })
162 .await
163 .ok_or(Error::NetworkClosed)?
164 }
165
166 pub async fn limit_bandwidth(
173 &self,
174 public_key: P,
175 egress_cap: Option<usize>,
176 ingress_cap: Option<usize>,
177 ) -> Result<(), Error> {
178 self.sender
179 .0
180 .request(|result| Message::LimitBandwidth {
181 public_key,
182 egress_cap,
183 ingress_cap,
184 result,
185 })
186 .await
187 .ok_or(Error::NetworkClosed)
188 }
189
190 pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
197 if sender == receiver {
199 return Err(Error::LinkingSelf);
200 }
201 if config.success_rate < 0.0 || config.success_rate > 1.0 {
202 return Err(Error::InvalidSuccessRate(config.success_rate));
203 }
204
205 let latency_ms = config.latency.as_secs_f64() * 1000.0;
207 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
208
209 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
211
212 self.sender
213 .0
214 .request(|result| Message::AddLink {
215 sender,
216 receiver,
217 sampler,
218 success_rate: config.success_rate,
219 result,
220 })
221 .await
222 .ok_or(Error::NetworkClosed)?
223 }
224
225 pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
229 if sender == receiver {
231 return Err(Error::LinkingSelf);
232 }
233
234 self.sender
235 .0
236 .request(|result| Message::RemoveLink {
237 sender,
238 receiver,
239 result,
240 })
241 .await
242 .ok_or(Error::NetworkClosed)?
243 }
244
245 async fn track(&self, id: u64, peers: Set<P>) {
247 self.sender.0.send_lossy(Message::Track { id, peers });
248 }
249
250 async fn peer_set(&self, id: u64) -> Option<Set<P>> {
252 self.sender
253 .0
254 .request(|response| Message::PeerSet { id, response })
255 .await
256 .flatten()
257 }
258
259 async fn subscribe(&self) -> mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)> {
261 let (sender, receiver) = mpsc::unbounded_channel();
262 self.sender.0.send_lossy(Message::Subscribe { sender });
263 receiver
264 }
265}
266
267pub struct Manager<P: PublicKey, E: Clock> {
271 oracle: Oracle<P, E>,
273}
274
275impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 f.debug_struct("Manager").finish_non_exhaustive()
278 }
279}
280
281impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
282 fn clone(&self) -> Self {
283 Self {
284 oracle: self.oracle.clone(),
285 }
286 }
287}
288
289impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
290 type PublicKey = P;
291
292 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
293 self.oracle.peer_set(id).await
294 }
295
296 async fn subscribe(
297 &mut self,
298 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
299 self.oracle.subscribe().await
300 }
301}
302
303impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
304 async fn track(&mut self, id: u64, peers: Set<Self::PublicKey>) {
305 self.oracle.track(id, peers).await;
306 }
307}
308
309pub struct SocketManager<P: PublicKey, E: Clock> {
319 oracle: Oracle<P, E>,
321}
322
323impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325 f.debug_struct("SocketManager").finish_non_exhaustive()
326 }
327}
328
329impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
330 fn clone(&self) -> Self {
331 Self {
332 oracle: self.oracle.clone(),
333 }
334 }
335}
336
337impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
338 type PublicKey = P;
339
340 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
341 self.oracle.peer_set(id).await
342 }
343
344 async fn subscribe(
345 &mut self,
346 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
347 self.oracle.subscribe().await
348 }
349}
350
351impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
352 async fn track(&mut self, id: u64, peers: Map<Self::PublicKey, Address>) {
353 self.oracle.track(id, peers.into_keys()).await;
355 }
356
357 async fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) {
358 }
360}
361
362#[derive(Debug)]
364pub struct Control<P: PublicKey, E: Clock> {
365 me: P,
367
368 sender: UnboundedMailbox<Message<P, E>>,
370}
371
372impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
373 fn clone(&self) -> Self {
374 Self {
375 me: self.me.clone(),
376 sender: self.sender.clone(),
377 }
378 }
379}
380
381impl<P: PublicKey, E: Clock> Control<P, E> {
382 pub async fn register(
389 &self,
390 channel: Channel,
391 quota: Quota,
392 ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
393 let public_key = self.me.clone();
394 self.sender
395 .0
396 .request(|result| Message::Register {
397 channel,
398 public_key,
399 quota,
400 result,
401 })
402 .await
403 .ok_or(Error::NetworkClosed)?
404 }
405}
406
407impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
408 type PublicKey = P;
409
410 async fn block(&mut self, public_key: P) {
411 self.sender.0.send_lossy(Message::Block {
412 from: self.me.clone(),
413 to: public_key,
414 });
415 }
416}