1use super::{Error, Receiver, Sender};
2use crate::{
3 Address, AddressableTrackedPeers, Channel, PeerSetSubscription, Recipients, TrackedPeers,
4};
5use commonware_actor::Feedback;
6use commonware_cryptography::PublicKey;
7use commonware_runtime::{Clock, IoBuf, Quota};
8use commonware_utils::{
9 channel::{fallible::FallibleExt, mpsc, oneshot, ring},
10 ordered::Map,
11};
12use rand_distr::Normal;
13use std::time::Duration;
14
15pub enum Message<P: PublicKey, E: Clock> {
16 Register {
17 channel: Channel,
18 public_key: P,
19 quota: Quota,
20 #[allow(clippy::type_complexity)]
21 result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
22 },
23 Send {
24 channel: Channel,
25 origin: P,
26 recipients: Recipients<P>,
27 message: IoBuf,
28 priority: bool,
29 },
30 Track {
31 id: u64,
32 peers: TrackedPeers<P>,
33 },
34 PeerSet {
35 id: u64,
36 response: oneshot::Sender<Option<TrackedPeers<P>>>,
37 },
38 Subscribe {
39 response: oneshot::Sender<PeerSetSubscription<P>>,
40 },
41 SubscribePeers {
42 exclude: P,
43 sender: ring::Sender<Vec<P>>,
44 },
45 LimitBandwidth {
46 public_key: P,
47 egress_cap: Option<usize>,
48 ingress_cap: Option<usize>,
49 result: oneshot::Sender<()>,
50 },
51 AddLink {
52 sender: P,
53 receiver: P,
54 sampler: Normal<f64>,
55 success_rate: f64,
56 result: oneshot::Sender<Result<(), Error>>,
57 },
58 RemoveLink {
59 sender: P,
60 receiver: P,
61 result: oneshot::Sender<Result<(), Error>>,
62 },
63 Block {
64 from: P,
66 to: P,
68 },
69 Blocked {
70 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
71 },
72}
73
74impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
78 Self::Send { .. } => f.debug_struct("Send").finish_non_exhaustive(),
79 Self::Track { id, .. } => f
80 .debug_struct("Track")
81 .field("id", id)
82 .finish_non_exhaustive(),
83 Self::PeerSet { id, .. } => f
84 .debug_struct("PeerSet")
85 .field("id", id)
86 .finish_non_exhaustive(),
87 Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
88 Self::SubscribePeers { exclude, .. } => f
89 .debug_struct("SubscribePeers")
90 .field("exclude", exclude)
91 .finish_non_exhaustive(),
92 Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
93 Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
94 Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
95 Self::Block { from, to } => f
96 .debug_struct("Block")
97 .field("from", from)
98 .field("to", to)
99 .finish(),
100 Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
101 }
102 }
103}
104
105fn enqueue<P: PublicKey, E: Clock>(
106 sender: &mpsc::UnboundedSender<Message<P, E>>,
107 message: Message<P, E>,
108) -> Feedback {
109 if sender.send_lossy(message) {
110 Feedback::Ok
111 } else {
112 Feedback::Closed
113 }
114}
115
116async fn request<P, E, R, F>(
117 sender: &mpsc::UnboundedSender<Message<P, E>>,
118 make_msg: F,
119) -> Option<R>
120where
121 P: PublicKey,
122 E: Clock,
123 R: Send,
124 F: FnOnce(oneshot::Sender<R>) -> Message<P, E> + Send,
125{
126 sender.request(make_msg).await
127}
128
129#[derive(Clone)]
134pub struct Link {
135 pub latency: Duration,
137
138 pub jitter: Duration,
140
141 pub success_rate: f64,
143}
144
145#[derive(Debug)]
150pub struct Oracle<P: PublicKey, E: Clock> {
151 sender: mpsc::UnboundedSender<Message<P, E>>,
152}
153
154impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
155 fn clone(&self) -> Self {
156 Self {
157 sender: self.sender.clone(),
158 }
159 }
160}
161
162impl<P: PublicKey, E: Clock> Oracle<P, E> {
163 pub(crate) const fn new(sender: mpsc::UnboundedSender<Message<P, E>>) -> Self {
165 Self { sender }
166 }
167
168 pub fn control(&self, me: P) -> Control<P, E> {
170 Control {
171 me,
172 sender: self.sender.clone(),
173 }
174 }
175
176 pub fn manager(&self) -> Manager<P, E> {
180 Manager {
181 oracle: self.clone(),
182 }
183 }
184
185 pub fn socket_manager(&self) -> SocketManager<P, E> {
189 SocketManager {
190 oracle: self.clone(),
191 }
192 }
193
194 pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
196 request(&self.sender, |result| Message::Blocked { result })
197 .await
198 .ok_or(Error::NetworkClosed)?
199 }
200
201 pub async fn limit_bandwidth(
208 &self,
209 public_key: P,
210 egress_cap: Option<usize>,
211 ingress_cap: Option<usize>,
212 ) -> Result<(), Error> {
213 request(&self.sender, move |result| Message::LimitBandwidth {
214 public_key,
215 egress_cap,
216 ingress_cap,
217 result,
218 })
219 .await
220 .ok_or(Error::NetworkClosed)
221 }
222
223 pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
230 if sender == receiver {
232 return Err(Error::LinkingSelf);
233 }
234 if config.success_rate < 0.0 || config.success_rate > 1.0 {
235 return Err(Error::InvalidSuccessRate(config.success_rate));
236 }
237
238 let latency_ms = config.latency.as_secs_f64() * 1000.0;
240 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
241
242 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
244
245 request(&self.sender, move |result| Message::AddLink {
246 sender,
247 receiver,
248 sampler,
249 success_rate: config.success_rate,
250 result,
251 })
252 .await
253 .ok_or(Error::NetworkClosed)?
254 }
255
256 pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
260 if sender == receiver {
262 return Err(Error::LinkingSelf);
263 }
264
265 request(&self.sender, move |result| Message::RemoveLink {
266 sender,
267 receiver,
268 result,
269 })
270 .await
271 .ok_or(Error::NetworkClosed)?
272 }
273
274 fn track(&self, id: u64, peers: TrackedPeers<P>) -> Feedback {
276 enqueue(&self.sender, Message::Track { id, peers })
277 }
278
279 async fn peer_set(&self, id: u64) -> Option<TrackedPeers<P>> {
281 request(&self.sender, move |response| Message::PeerSet {
282 id,
283 response,
284 })
285 .await
286 .flatten()
287 }
288
289 async fn subscribe(&self) -> PeerSetSubscription<P> {
291 request(&self.sender, |response| Message::Subscribe { response })
292 .await
293 .unwrap_or_else(|| {
294 let (_, rx) = mpsc::unbounded_channel();
295 rx
296 })
297 }
298}
299
300pub struct Manager<P: PublicKey, E: Clock> {
304 oracle: Oracle<P, E>,
306}
307
308impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 f.debug_struct("Manager").finish_non_exhaustive()
311 }
312}
313
314impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
315 fn clone(&self) -> Self {
316 Self {
317 oracle: self.oracle.clone(),
318 }
319 }
320}
321
322impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
323 type PublicKey = P;
324
325 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
326 self.oracle.peer_set(id).await
327 }
328
329 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
330 self.oracle.subscribe().await
331 }
332}
333
334impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
335 fn track<R>(&mut self, id: u64, peers: R) -> Feedback
336 where
337 R: Into<TrackedPeers<Self::PublicKey>> + Send,
338 {
339 self.oracle.track(id, peers.into())
340 }
341}
342
343pub struct SocketManager<P: PublicKey, E: Clock> {
353 oracle: Oracle<P, E>,
355}
356
357impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 f.debug_struct("SocketManager").finish_non_exhaustive()
360 }
361}
362
363impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
364 fn clone(&self) -> Self {
365 Self {
366 oracle: self.oracle.clone(),
367 }
368 }
369}
370
371impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
372 type PublicKey = P;
373
374 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
375 self.oracle.peer_set(id).await
376 }
377
378 async fn subscribe(&mut self) -> PeerSetSubscription<P> {
379 self.oracle.subscribe().await
380 }
381}
382
383impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
384 fn track<R>(&mut self, id: u64, peers: R) -> Feedback
385 where
386 R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
387 {
388 let peers = peers.into();
390 self.oracle.track(
391 id,
392 TrackedPeers::new(peers.primary.into_keys(), peers.secondary.into_keys()),
393 )
394 }
395
396 fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) -> Feedback {
397 Feedback::Ok
399 }
400}
401
402#[derive(Debug)]
404pub struct Control<P: PublicKey, E: Clock> {
405 me: P,
407
408 sender: mpsc::UnboundedSender<Message<P, E>>,
410}
411
412impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
413 fn clone(&self) -> Self {
414 Self {
415 me: self.me.clone(),
416 sender: self.sender.clone(),
417 }
418 }
419}
420
421impl<P: PublicKey, E: Clock> Control<P, E> {
422 pub async fn register(
429 &self,
430 channel: Channel,
431 quota: Quota,
432 ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
433 let public_key = self.me.clone();
434 request(&self.sender, move |result| Message::Register {
435 channel,
436 public_key,
437 quota,
438 result,
439 })
440 .await
441 .ok_or(Error::NetworkClosed)?
442 }
443}
444
445impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
446 type PublicKey = P;
447
448 fn block(&mut self, public_key: P) -> Feedback {
449 enqueue(
450 &self.sender,
451 Message::Block {
452 from: self.me.clone(),
453 to: public_key,
454 },
455 )
456 }
457}