commonware_p2p/simulated/
ingress.rs1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_cryptography::PublicKey;
4use futures::{
5 channel::{mpsc, oneshot},
6 SinkExt,
7};
8use rand_distr::Normal;
9use std::time::Duration;
10
11pub enum Message<P: PublicKey> {
12 Register {
13 public_key: P,
14 channel: Channel,
15 #[allow(clippy::type_complexity)]
16 result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
17 },
18 SetBandwidth {
19 public_key: P,
20 egress_bps: usize,
21 ingress_bps: usize,
22 result: oneshot::Sender<Result<(), Error>>,
23 },
24 AddLink {
25 sender: P,
26 receiver: P,
27 sampler: Normal<f64>,
28 success_rate: f64,
29 result: oneshot::Sender<Result<(), Error>>,
30 },
31 RemoveLink {
32 sender: P,
33 receiver: P,
34 result: oneshot::Sender<Result<(), Error>>,
35 },
36 Block {
37 from: P,
39 to: P,
41 },
42 Blocked {
43 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
44 },
45}
46
47#[derive(Clone)]
52pub struct Link {
53 pub latency: Duration,
55
56 pub jitter: Duration,
58
59 pub success_rate: f64,
61}
62
63#[derive(Clone)]
68pub struct Oracle<P: PublicKey> {
69 sender: mpsc::UnboundedSender<Message<P>>,
70}
71
72impl<P: PublicKey> Oracle<P> {
73 pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
75 Self { sender }
76 }
77
78 pub fn control(&self, me: P) -> Control<P> {
80 Control {
81 me,
82 sender: self.sender.clone(),
83 }
84 }
85
86 pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
88 let (s, r) = oneshot::channel();
89 self.sender
90 .send(Message::Blocked { result: s })
91 .await
92 .map_err(|_| Error::NetworkClosed)?;
93 r.await.map_err(|_| Error::NetworkClosed)?
94 }
95
96 pub async fn register(
101 &mut self,
102 public_key: P,
103 channel: Channel,
104 ) -> Result<(Sender<P>, Receiver<P>), Error> {
105 let (sender, receiver) = oneshot::channel();
106 self.sender
107 .send(Message::Register {
108 public_key,
109 channel,
110 result: sender,
111 })
112 .await
113 .map_err(|_| Error::NetworkClosed)?;
114 receiver.await.map_err(|_| Error::NetworkClosed)?
115 }
116
117 pub async fn set_bandwidth(
122 &mut self,
123 public_key: P,
124 egress_bps: usize,
125 ingress_bps: usize,
126 ) -> Result<(), Error> {
127 let (sender, receiver) = oneshot::channel();
128 self.sender
129 .send(Message::SetBandwidth {
130 public_key,
131 egress_bps,
132 ingress_bps,
133 result: sender,
134 })
135 .await
136 .map_err(|_| Error::NetworkClosed)?;
137 receiver.await.map_err(|_| Error::NetworkClosed)?
138 }
139
140 pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
145 if sender == receiver {
147 return Err(Error::LinkingSelf);
148 }
149 if config.success_rate < 0.0 || config.success_rate > 1.0 {
150 return Err(Error::InvalidSuccessRate(config.success_rate));
151 }
152
153 let latency_ms = config.latency.as_secs_f64() * 1000.0;
155 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
156
157 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
159
160 let (s, r) = oneshot::channel();
162 self.sender
163 .send(Message::AddLink {
164 sender,
165 receiver,
166 sampler,
167 success_rate: config.success_rate,
168 result: s,
169 })
170 .await
171 .map_err(|_| Error::NetworkClosed)?;
172 r.await.map_err(|_| Error::NetworkClosed)?
173 }
174
175 pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
179 if sender == receiver {
181 return Err(Error::LinkingSelf);
182 }
183
184 let (s, r) = oneshot::channel();
186 self.sender
187 .send(Message::RemoveLink {
188 sender,
189 receiver,
190 result: s,
191 })
192 .await
193 .map_err(|_| Error::NetworkClosed)?;
194 r.await.map_err(|_| Error::NetworkClosed)?
195 }
196}
197
198#[derive(Clone)]
200pub struct Control<P: PublicKey> {
201 me: P,
203
204 sender: mpsc::UnboundedSender<Message<P>>,
206}
207
208impl<P: PublicKey> crate::Blocker for Control<P> {
209 type PublicKey = P;
210
211 async fn block(&mut self, public_key: P) {
212 let _ = self
213 .sender
214 .send(Message::Block {
215 from: self.me.clone(),
216 to: public_key,
217 })
218 .await;
219 }
220}