commonware_p2p/simulated/
ingress.rs1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_cryptography::PublicKey;
4use commonware_utils::set::Ordered;
5use futures::{
6 channel::{mpsc, oneshot},
7 SinkExt,
8};
9use rand_distr::Normal;
10use std::time::Duration;
11
12pub enum Message<P: PublicKey> {
13 Update {
14 peer_set: u64,
15 peers: Ordered<P>,
16 },
17 Register {
18 channel: Channel,
19 public_key: P,
20 #[allow(clippy::type_complexity)]
21 result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
22 },
23 PeerSet {
24 index: u64,
25 response: oneshot::Sender<Option<Ordered<P>>>,
26 },
27 Subscribe {
28 #[allow(clippy::type_complexity)]
29 response: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Ordered<P>, Ordered<P>)>>,
30 },
31 LimitBandwidth {
32 public_key: P,
33 egress_cap: Option<usize>,
34 ingress_cap: Option<usize>,
35 result: oneshot::Sender<Result<(), Error>>,
36 },
37 AddLink {
38 sender: P,
39 receiver: P,
40 sampler: Normal<f64>,
41 success_rate: f64,
42 result: oneshot::Sender<Result<(), Error>>,
43 },
44 RemoveLink {
45 sender: P,
46 receiver: P,
47 result: oneshot::Sender<Result<(), Error>>,
48 },
49 Block {
50 from: P,
52 to: P,
54 },
55 Blocked {
56 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
57 },
58}
59
60#[derive(Clone)]
65pub struct Link {
66 pub latency: Duration,
68
69 pub jitter: Duration,
71
72 pub success_rate: f64,
74}
75
76#[derive(Debug, Clone)]
81pub struct Oracle<P: PublicKey> {
82 sender: mpsc::UnboundedSender<Message<P>>,
83}
84
85impl<P: PublicKey> Oracle<P> {
86 pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
88 Self { sender }
89 }
90
91 pub fn control(&self, me: P) -> Control<P> {
93 Control {
94 me,
95 sender: self.sender.clone(),
96 }
97 }
98
99 pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
101 let (s, r) = oneshot::channel();
102 self.sender
103 .send(Message::Blocked { result: s })
104 .await
105 .map_err(|_| Error::NetworkClosed)?;
106 r.await.map_err(|_| Error::NetworkClosed)?
107 }
108
109 pub async fn limit_bandwidth(
114 &mut self,
115 public_key: P,
116 egress_cap: Option<usize>,
117 ingress_cap: Option<usize>,
118 ) -> Result<(), Error> {
119 let (sender, receiver) = oneshot::channel();
120 self.sender
121 .send(Message::LimitBandwidth {
122 public_key,
123 egress_cap,
124 ingress_cap,
125 result: sender,
126 })
127 .await
128 .map_err(|_| Error::NetworkClosed)?;
129 receiver.await.map_err(|_| Error::NetworkClosed)?
130 }
131
132 pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
137 if sender == receiver {
139 return Err(Error::LinkingSelf);
140 }
141 if config.success_rate < 0.0 || config.success_rate > 1.0 {
142 return Err(Error::InvalidSuccessRate(config.success_rate));
143 }
144
145 let latency_ms = config.latency.as_secs_f64() * 1000.0;
147 let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
148
149 let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
151
152 let (s, r) = oneshot::channel();
154 self.sender
155 .send(Message::AddLink {
156 sender,
157 receiver,
158 sampler,
159 success_rate: config.success_rate,
160 result: s,
161 })
162 .await
163 .map_err(|_| Error::NetworkClosed)?;
164 r.await.map_err(|_| Error::NetworkClosed)?
165 }
166
167 pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
171 if sender == receiver {
173 return Err(Error::LinkingSelf);
174 }
175
176 let (s, r) = oneshot::channel();
178 self.sender
179 .send(Message::RemoveLink {
180 sender,
181 receiver,
182 result: s,
183 })
184 .await
185 .map_err(|_| Error::NetworkClosed)?;
186 r.await.map_err(|_| Error::NetworkClosed)?
187 }
188}
189
190impl<P: PublicKey> crate::Manager for Oracle<P> {
191 type PublicKey = P;
192 type Peers = Ordered<Self::PublicKey>;
193
194 async fn update(&mut self, peer_set: u64, peers: Self::Peers) {
195 self.sender
196 .send(Message::Update { peer_set, peers })
197 .await
198 .unwrap();
199 }
200
201 async fn peer_set(&mut self, id: u64) -> Option<Ordered<Self::PublicKey>> {
202 let (sender, receiver) = oneshot::channel();
203 self.sender
204 .send(Message::PeerSet {
205 index: id,
206 response: sender,
207 })
208 .await
209 .unwrap();
210 receiver.await.unwrap()
211 }
212
213 async fn subscribe(
214 &mut self,
215 ) -> mpsc::UnboundedReceiver<(u64, Ordered<Self::PublicKey>, Ordered<Self::PublicKey>)> {
216 let (sender, receiver) = oneshot::channel();
217 self.sender
218 .send(Message::Subscribe { response: sender })
219 .await
220 .unwrap();
221 receiver.await.unwrap()
222 }
223}
224
225#[derive(Debug, Clone)]
227pub struct Control<P: PublicKey> {
228 me: P,
230
231 sender: mpsc::UnboundedSender<Message<P>>,
233}
234
235impl<P: PublicKey> Control<P> {
236 pub async fn register(&mut self, channel: Channel) -> Result<(Sender<P>, Receiver<P>), Error> {
238 let (tx, rx) = oneshot::channel();
239 self.sender
240 .send(Message::Register {
241 channel,
242 public_key: self.me.clone(),
243 result: tx,
244 })
245 .await
246 .unwrap();
247 rx.await.unwrap()
248 }
249}
250
251impl<P: PublicKey> crate::Blocker for Control<P> {
252 type PublicKey = P;
253
254 async fn block(&mut self, public_key: P) {
255 let _ = self
256 .sender
257 .send(Message::Block {
258 from: self.me.clone(),
259 to: public_key,
260 })
261 .await;
262 }
263}