1use super::{Error, Receiver, Sender};
2use crate::{Address, Channel};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Clock, Quota};
5use commonware_utils::{
6 channels::ring,
7 ordered::{Map, Set},
8};
9use futures::{
10 channel::{mpsc, oneshot},
11 SinkExt,
12};
13use rand_distr::Normal;
14use std::time::Duration;
15
16pub enum Message<P: PublicKey, E: Clock> {
17 Register {
18 channel: Channel,
19 public_key: P,
20 quota: Quota,
21 #[allow(clippy::type_complexity)]
22 result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
23 },
24 Update {
25 id: u64,
26 peers: Set<P>,
27 },
28 PeerSet {
29 id: u64,
30 response: oneshot::Sender<Option<Set<P>>>,
31 },
32 Subscribe {
33 sender: mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>,
34 },
35 SubscribeConnected {
36 response: oneshot::Sender<ring::Receiver<Vec<P>>>,
37 },
38 LimitBandwidth {
39 public_key: P,
40 egress_cap: Option<usize>,
41 ingress_cap: Option<usize>,
42 result: oneshot::Sender<()>,
43 },
44 AddLink {
45 sender: P,
46 receiver: P,
47 sampler: Normal<f64>,
48 success_rate: f64,
49 result: oneshot::Sender<Result<(), Error>>,
50 },
51 RemoveLink {
52 sender: P,
53 receiver: P,
54 result: oneshot::Sender<Result<(), Error>>,
55 },
56 Block {
57 from: P,
59 to: P,
61 },
62 Blocked {
63 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
64 },
65}
66
67#[derive(Clone)]
72pub struct Link {
73 pub latency: Duration,
75
76 pub jitter: Duration,
78
79 pub success_rate: f64,
81}
82
83#[derive(Debug)]
88pub struct Oracle<P: PublicKey, E: Clock> {
89 sender: mpsc::UnboundedSender<Message<P, E>>,
90}
91
92impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
93 fn clone(&self) -> Self {
94 Self {
95 sender: self.sender.clone(),
96 }
97 }
98}
99
100impl<P: PublicKey, E: Clock> Oracle<P, E> {
101 pub(crate) const fn new(sender: mpsc::UnboundedSender<Message<P, E>>) -> Self {
103 Self { sender }
104 }
105
106 pub fn control(&self, me: P) -> Control<P, E> {
108 Control {
109 me,
110 sender: self.sender.clone(),
111 }
112 }
113
114 pub fn manager(&self) -> Manager<P, E> {
118 Manager {
119 oracle: self.clone(),
120 }
121 }
122
123 pub fn socket_manager(&self) -> SocketManager<P, E> {
127 SocketManager {
128 oracle: self.clone(),
129 }
130 }
131
132 pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
134 let (s, r) = oneshot::channel();
135 self.sender
136 .send(Message::Blocked { result: s })
137 .await
138 .map_err(|_| Error::NetworkClosed)?;
139 r.await.map_err(|_| Error::NetworkClosed)?
140 }
141
142 pub async fn limit_bandwidth(
149 &mut self,
150 public_key: P,
151 egress_cap: Option<usize>,
152 ingress_cap: Option<usize>,
153 ) -> Result<(), Error> {
154 let (sender, receiver) = oneshot::channel();
155 self.sender
156 .send(Message::LimitBandwidth {
157 public_key,
158 egress_cap,
159 ingress_cap,
160 result: sender,
161 })
162 .await
163 .map_err(|_| Error::NetworkClosed)?;
164 receiver.await.map_err(|_| Error::NetworkClosed)
165 }
166
167 pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
174 if sender == receiver {
176 return Err(Error::LinkingSelf);
177 }
178 if config.success_rate < 0.0 || config.success_rate > 1.0 {
179 return Err(Error::InvalidSuccessRate(config.success_rate));
180 }
181
182 let latency_ms = config.latency.as_secs_f64() * 1000.0;
184 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
185
186 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
188
189 let (s, r) = oneshot::channel();
191 self.sender
192 .send(Message::AddLink {
193 sender,
194 receiver,
195 sampler,
196 success_rate: config.success_rate,
197 result: s,
198 })
199 .await
200 .map_err(|_| Error::NetworkClosed)?;
201 r.await.map_err(|_| Error::NetworkClosed)?
202 }
203
204 pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
208 if sender == receiver {
210 return Err(Error::LinkingSelf);
211 }
212
213 let (s, r) = oneshot::channel();
215 self.sender
216 .send(Message::RemoveLink {
217 sender,
218 receiver,
219 result: s,
220 })
221 .await
222 .map_err(|_| Error::NetworkClosed)?;
223 r.await.map_err(|_| Error::NetworkClosed)?
224 }
225
226 async fn update(&mut self, id: u64, peers: Set<P>) {
228 let _ = self.sender.send(Message::Update { id, peers }).await;
229 }
230
231 async fn peer_set(&mut self, id: u64) -> Option<Set<P>> {
233 let (sender, receiver) = oneshot::channel();
234 self.sender
235 .send(Message::PeerSet {
236 id,
237 response: sender,
238 })
239 .await
240 .ok()?;
241 receiver.await.ok().flatten()
242 }
243
244 async fn subscribe(&mut self) -> mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)> {
246 let (sender, receiver) = mpsc::unbounded();
247 let _ = self.sender.send(Message::Subscribe { sender }).await;
248 receiver
249 }
250}
251
252pub struct Manager<P: PublicKey, E: Clock> {
256 oracle: Oracle<P, E>,
258}
259
260impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 f.debug_struct("Manager").finish_non_exhaustive()
263 }
264}
265
266impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
267 fn clone(&self) -> Self {
268 Self {
269 oracle: self.oracle.clone(),
270 }
271 }
272}
273
274impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
275 type PublicKey = P;
276 type Peers = Set<Self::PublicKey>;
277
278 async fn update(&mut self, id: u64, peers: Self::Peers) {
279 self.oracle.update(id, peers).await;
280 }
281
282 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
283 self.oracle.peer_set(id).await
284 }
285
286 async fn subscribe(
287 &mut self,
288 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
289 self.oracle.subscribe().await
290 }
291}
292
293pub struct SocketManager<P: PublicKey, E: Clock> {
303 oracle: Oracle<P, E>,
305}
306
307impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
308 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309 f.debug_struct("SocketManager").finish_non_exhaustive()
310 }
311}
312
313impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
314 fn clone(&self) -> Self {
315 Self {
316 oracle: self.oracle.clone(),
317 }
318 }
319}
320
321impl<P: PublicKey, E: Clock> crate::Manager for SocketManager<P, E> {
322 type PublicKey = P;
323 type Peers = Map<Self::PublicKey, Address>;
324
325 async fn update(&mut self, id: u64, peers: Self::Peers) {
326 self.oracle.update(id, peers.into_keys()).await;
328 }
329
330 async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
331 self.oracle.peer_set(id).await
332 }
333
334 async fn subscribe(
335 &mut self,
336 ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
337 self.oracle.subscribe().await
338 }
339}
340
341#[derive(Debug)]
343pub struct Control<P: PublicKey, E: Clock> {
344 me: P,
346
347 sender: mpsc::UnboundedSender<Message<P, E>>,
349}
350
351impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
352 fn clone(&self) -> Self {
353 Self {
354 me: self.me.clone(),
355 sender: self.sender.clone(),
356 }
357 }
358}
359
360impl<P: PublicKey, E: Clock> Control<P, E> {
361 pub async fn register(
368 &mut self,
369 channel: Channel,
370 quota: Quota,
371 ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
372 let (tx, rx) = oneshot::channel();
373 self.sender
374 .send(Message::Register {
375 channel,
376 public_key: self.me.clone(),
377 quota,
378 result: tx,
379 })
380 .await
381 .map_err(|_| Error::NetworkClosed)?;
382 rx.await.map_err(|_| Error::NetworkClosed)?
383 }
384}
385
386impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
387 type PublicKey = P;
388
389 async fn block(&mut self, public_key: P) {
390 let _ = self
391 .sender
392 .send(Message::Block {
393 from: self.me.clone(),
394 to: public_key,
395 })
396 .await;
397 }
398}