commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_cryptography::PublicKey;
4use commonware_utils::set::Ordered;
5use futures::{
6    channel::{mpsc, oneshot},
7    SinkExt,
8};
9use rand_distr::Normal;
10use std::time::Duration;
11
12pub enum Message<P: PublicKey> {
13    Update {
14        peer_set: u64,
15        peers: Ordered<P>,
16    },
17    Register {
18        channel: Channel,
19        public_key: P,
20        #[allow(clippy::type_complexity)]
21        result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
22    },
23    PeerSet {
24        index: u64,
25        response: oneshot::Sender<Option<Ordered<P>>>,
26    },
27    Subscribe {
28        #[allow(clippy::type_complexity)]
29        response: oneshot::Sender<mpsc::UnboundedReceiver<(u64, Ordered<P>, Ordered<P>)>>,
30    },
31    LimitBandwidth {
32        public_key: P,
33        egress_cap: Option<usize>,
34        ingress_cap: Option<usize>,
35        result: oneshot::Sender<Result<(), Error>>,
36    },
37    AddLink {
38        sender: P,
39        receiver: P,
40        sampler: Normal<f64>,
41        success_rate: f64,
42        result: oneshot::Sender<Result<(), Error>>,
43    },
44    RemoveLink {
45        sender: P,
46        receiver: P,
47        result: oneshot::Sender<Result<(), Error>>,
48    },
49    Block {
50        /// The public key of the peer sending the block request.
51        from: P,
52        /// The public key of the peer to block.
53        to: P,
54    },
55    Blocked {
56        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
57    },
58}
59
60/// Describes a connection between two peers.
61///
62/// Links are unidirectional (and must be set up in both directions
63/// for a bidirectional connection).
64#[derive(Clone)]
65pub struct Link {
66    /// Mean latency for the delivery of a message.
67    pub latency: Duration,
68
69    /// Standard deviation of the latency for the delivery of a message.
70    pub jitter: Duration,
71
72    /// Probability of a message being delivered successfully (in range \[0,1\]).
73    pub success_rate: f64,
74}
75
76/// Interface for modifying the simulated network.
77///
78/// At any point, peers can be added/removed and links
79/// between said peers can be modified.
80#[derive(Debug, Clone)]
81pub struct Oracle<P: PublicKey> {
82    sender: mpsc::UnboundedSender<Message<P>>,
83}
84
85impl<P: PublicKey> Oracle<P> {
86    /// Create a new instance of the oracle.
87    pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
88        Self { sender }
89    }
90
91    /// Spawn an individual control interface for a peer in the simulated network.
92    pub fn control(&self, me: P) -> Control<P> {
93        Control {
94            me,
95            sender: self.sender.clone(),
96        }
97    }
98
99    /// Return a list of all blocked peers.
100    pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
101        let (s, r) = oneshot::channel();
102        self.sender
103            .send(Message::Blocked { result: s })
104            .await
105            .map_err(|_| Error::NetworkClosed)?;
106        r.await.map_err(|_| Error::NetworkClosed)?
107    }
108
109    /// Set bandwidth limits for a peer.
110    ///
111    /// Bandwidth is specified for the peer's egress (upload) and ingress (download)
112    /// rates in bytes per second. Use `None` for unlimited bandwidth.
113    pub async fn limit_bandwidth(
114        &mut self,
115        public_key: P,
116        egress_cap: Option<usize>,
117        ingress_cap: Option<usize>,
118    ) -> Result<(), Error> {
119        let (sender, receiver) = oneshot::channel();
120        self.sender
121            .send(Message::LimitBandwidth {
122                public_key,
123                egress_cap,
124                ingress_cap,
125                result: sender,
126            })
127            .await
128            .map_err(|_| Error::NetworkClosed)?;
129        receiver.await.map_err(|_| Error::NetworkClosed)?
130    }
131
132    /// Create a unidirectional link between two peers.
133    ///
134    /// Link can be called multiple times for the same sender/receiver. The latest
135    /// setting will be used.
136    pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
137        // Sanity checks
138        if sender == receiver {
139            return Err(Error::LinkingSelf);
140        }
141        if config.success_rate < 0.0 || config.success_rate > 1.0 {
142            return Err(Error::InvalidSuccessRate(config.success_rate));
143        }
144
145        // Convert Duration to milliseconds as f64 for the Normal distribution
146        let latency_ms = config.latency.as_secs_f64() * 1000.0;
147        let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
148
149        // Create distribution
150        let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
151
152        // Wait for update to complete
153        let (s, r) = oneshot::channel();
154        self.sender
155            .send(Message::AddLink {
156                sender,
157                receiver,
158                sampler,
159                success_rate: config.success_rate,
160                result: s,
161            })
162            .await
163            .map_err(|_| Error::NetworkClosed)?;
164        r.await.map_err(|_| Error::NetworkClosed)?
165    }
166
167    /// Remove a unidirectional link between two peers.
168    ///
169    /// If no link exists, this will return an error.
170    pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
171        // Sanity checks
172        if sender == receiver {
173            return Err(Error::LinkingSelf);
174        }
175
176        // Wait for update to complete
177        let (s, r) = oneshot::channel();
178        self.sender
179            .send(Message::RemoveLink {
180                sender,
181                receiver,
182                result: s,
183            })
184            .await
185            .map_err(|_| Error::NetworkClosed)?;
186        r.await.map_err(|_| Error::NetworkClosed)?
187    }
188}
189
190impl<P: PublicKey> crate::Manager for Oracle<P> {
191    type PublicKey = P;
192    type Peers = Ordered<Self::PublicKey>;
193
194    async fn update(&mut self, peer_set: u64, peers: Self::Peers) {
195        self.sender
196            .send(Message::Update { peer_set, peers })
197            .await
198            .unwrap();
199    }
200
201    async fn peer_set(&mut self, id: u64) -> Option<Ordered<Self::PublicKey>> {
202        let (sender, receiver) = oneshot::channel();
203        self.sender
204            .send(Message::PeerSet {
205                index: id,
206                response: sender,
207            })
208            .await
209            .unwrap();
210        receiver.await.unwrap()
211    }
212
213    async fn subscribe(
214        &mut self,
215    ) -> mpsc::UnboundedReceiver<(u64, Ordered<Self::PublicKey>, Ordered<Self::PublicKey>)> {
216        let (sender, receiver) = oneshot::channel();
217        self.sender
218            .send(Message::Subscribe { response: sender })
219            .await
220            .unwrap();
221        receiver.await.unwrap()
222    }
223}
224
225/// Individual control interface for a peer in the simulated network.
226#[derive(Debug, Clone)]
227pub struct Control<P: PublicKey> {
228    /// The public key of the peer this control interface is for.
229    me: P,
230
231    /// Sender for messages to the oracle.
232    sender: mpsc::UnboundedSender<Message<P>>,
233}
234
235impl<P: PublicKey> Control<P> {
236    /// Register the communication interfaces for the peer over a given [Channel].
237    pub async fn register(&mut self, channel: Channel) -> Result<(Sender<P>, Receiver<P>), Error> {
238        let (tx, rx) = oneshot::channel();
239        self.sender
240            .send(Message::Register {
241                channel,
242                public_key: self.me.clone(),
243                result: tx,
244            })
245            .await
246            .unwrap();
247        rx.await.unwrap()
248    }
249}
250
251impl<P: PublicKey> crate::Blocker for Control<P> {
252    type PublicKey = P;
253
254    async fn block(&mut self, public_key: P) {
255        let _ = self
256            .sender
257            .send(Message::Block {
258                from: self.me.clone(),
259                to: public_key,
260            })
261            .await;
262    }
263}