commonware_p2p/simulated/
ingress.rs1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_utils::Array;
4use futures::{
5 channel::{mpsc, oneshot},
6 SinkExt,
7};
8use rand_distr::Normal;
9
10pub enum Message<P: Array> {
11 Register {
12 public_key: P,
13 channel: Channel,
14 #[allow(clippy::type_complexity)]
15 result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
16 },
17 AddLink {
18 sender: P,
19 receiver: P,
20 sampler: Normal<f64>,
21 success_rate: f64,
22 result: oneshot::Sender<Result<(), Error>>,
23 },
24 RemoveLink {
25 sender: P,
26 receiver: P,
27 result: oneshot::Sender<Result<(), Error>>,
28 },
29}
30
31#[derive(Clone)]
36pub struct Link {
37 pub latency: f64,
39
40 pub jitter: f64,
42
43 pub success_rate: f64,
45}
46
47#[derive(Clone)]
52pub struct Oracle<P: Array> {
53 sender: mpsc::UnboundedSender<Message<P>>,
54}
55
56impl<P: Array> Oracle<P> {
57 pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
58 Self { sender }
59 }
60
61 pub async fn register(
66 &mut self,
67 public_key: P,
68 channel: Channel,
69 ) -> Result<(Sender<P>, Receiver<P>), Error> {
70 let (sender, receiver) = oneshot::channel();
71 self.sender
72 .send(Message::Register {
73 public_key,
74 channel,
75 result: sender,
76 })
77 .await
78 .map_err(|_| Error::NetworkClosed)?;
79 receiver.await.map_err(|_| Error::NetworkClosed)?
80 }
81
82 pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
87 if sender == receiver {
89 return Err(Error::LinkingSelf);
90 }
91 if config.success_rate < 0.0 || config.success_rate > 1.0 {
92 return Err(Error::InvalidSuccessRate(config.success_rate));
93 }
94 if config.latency < 0.0 || config.jitter < 0.0 {
95 return Err(Error::InvalidBehavior(config.latency, config.jitter));
96 }
97
98 let sampler = Normal::new(config.latency, config.jitter)
100 .map_err(|_| Error::InvalidBehavior(config.latency, config.jitter))?;
101
102 let (s, r) = oneshot::channel();
104 self.sender
105 .send(Message::AddLink {
106 sender,
107 receiver,
108 sampler,
109 success_rate: config.success_rate,
110 result: s,
111 })
112 .await
113 .map_err(|_| Error::NetworkClosed)?;
114 r.await.map_err(|_| Error::NetworkClosed)?
115 }
116
117 pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
121 if sender == receiver {
123 return Err(Error::LinkingSelf);
124 }
125
126 let (s, r) = oneshot::channel();
128 self.sender
129 .send(Message::RemoveLink {
130 sender,
131 receiver,
132 result: s,
133 })
134 .await
135 .map_err(|_| Error::NetworkClosed)?;
136 r.await.map_err(|_| Error::NetworkClosed)?
137 }
138}