commonware_p2p/simulated/
ingress.rs1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_utils::Array;
4use futures::{
5 channel::{mpsc, oneshot},
6 SinkExt,
7};
8use rand_distr::Normal;
9
10pub enum Message<P: Array> {
11 Register {
12 public_key: P,
13 channel: Channel,
14 #[allow(clippy::type_complexity)]
15 result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
16 },
17 AddLink {
18 sender: P,
19 receiver: P,
20 sampler: Normal<f64>,
21 success_rate: f64,
22 result: oneshot::Sender<Result<(), Error>>,
23 },
24 RemoveLink {
25 sender: P,
26 receiver: P,
27 result: oneshot::Sender<Result<(), Error>>,
28 },
29 Block {
30 from: P,
32 to: P,
34 },
35 Blocked {
36 result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
37 },
38}
39
40#[derive(Clone)]
45pub struct Link {
46 pub latency: f64,
48
49 pub jitter: f64,
51
52 pub success_rate: f64,
54}
55
56#[derive(Clone)]
61pub struct Oracle<P: Array> {
62 sender: mpsc::UnboundedSender<Message<P>>,
63}
64
65impl<P: Array> Oracle<P> {
66 pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
68 Self { sender }
69 }
70
71 pub fn control(&self, me: P) -> Control<P> {
73 Control {
74 me,
75 sender: self.sender.clone(),
76 }
77 }
78
79 pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
81 let (s, r) = oneshot::channel();
82 self.sender
83 .send(Message::Blocked { result: s })
84 .await
85 .map_err(|_| Error::NetworkClosed)?;
86 r.await.map_err(|_| Error::NetworkClosed)?
87 }
88
89 pub async fn register(
94 &mut self,
95 public_key: P,
96 channel: Channel,
97 ) -> Result<(Sender<P>, Receiver<P>), Error> {
98 let (sender, receiver) = oneshot::channel();
99 self.sender
100 .send(Message::Register {
101 public_key,
102 channel,
103 result: sender,
104 })
105 .await
106 .map_err(|_| Error::NetworkClosed)?;
107 receiver.await.map_err(|_| Error::NetworkClosed)?
108 }
109
110 pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
115 if sender == receiver {
117 return Err(Error::LinkingSelf);
118 }
119 if config.success_rate < 0.0 || config.success_rate > 1.0 {
120 return Err(Error::InvalidSuccessRate(config.success_rate));
121 }
122 if config.latency < 0.0 || config.jitter < 0.0 {
123 return Err(Error::InvalidBehavior(config.latency, config.jitter));
124 }
125
126 let sampler = Normal::new(config.latency, config.jitter)
128 .map_err(|_| Error::InvalidBehavior(config.latency, config.jitter))?;
129
130 let (s, r) = oneshot::channel();
132 self.sender
133 .send(Message::AddLink {
134 sender,
135 receiver,
136 sampler,
137 success_rate: config.success_rate,
138 result: s,
139 })
140 .await
141 .map_err(|_| Error::NetworkClosed)?;
142 r.await.map_err(|_| Error::NetworkClosed)?
143 }
144
145 pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
149 if sender == receiver {
151 return Err(Error::LinkingSelf);
152 }
153
154 let (s, r) = oneshot::channel();
156 self.sender
157 .send(Message::RemoveLink {
158 sender,
159 receiver,
160 result: s,
161 })
162 .await
163 .map_err(|_| Error::NetworkClosed)?;
164 r.await.map_err(|_| Error::NetworkClosed)?
165 }
166}
167
168#[derive(Clone)]
170pub struct Control<P: Array> {
171 me: P,
173
174 sender: mpsc::UnboundedSender<Message<P>>,
176}
177
178impl<P: Array> crate::Blocker for Control<P> {
179 type PublicKey = P;
180
181 async fn block(&mut self, public_key: P) {
182 let _ = self
183 .sender
184 .send(Message::Block {
185 from: self.me.clone(),
186 to: public_key,
187 })
188 .await;
189 }
190}