commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::{Address, Channel};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Clock, Quota};
5use commonware_utils::{
6    channels::ring,
7    ordered::{Map, Set},
8};
9use futures::{
10    channel::{mpsc, oneshot},
11    SinkExt,
12};
13use rand_distr::Normal;
14use std::time::Duration;
15
16pub enum Message<P: PublicKey, E: Clock> {
17    Register {
18        channel: Channel,
19        public_key: P,
20        quota: Quota,
21        #[allow(clippy::type_complexity)]
22        result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
23    },
24    Update {
25        id: u64,
26        peers: Set<P>,
27    },
28    PeerSet {
29        id: u64,
30        response: oneshot::Sender<Option<Set<P>>>,
31    },
32    Subscribe {
33        sender: mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>,
34    },
35    SubscribeConnected {
36        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
37    },
38    LimitBandwidth {
39        public_key: P,
40        egress_cap: Option<usize>,
41        ingress_cap: Option<usize>,
42        result: oneshot::Sender<()>,
43    },
44    AddLink {
45        sender: P,
46        receiver: P,
47        sampler: Normal<f64>,
48        success_rate: f64,
49        result: oneshot::Sender<Result<(), Error>>,
50    },
51    RemoveLink {
52        sender: P,
53        receiver: P,
54        result: oneshot::Sender<Result<(), Error>>,
55    },
56    Block {
57        /// The public key of the peer sending the block request.
58        from: P,
59        /// The public key of the peer to block.
60        to: P,
61    },
62    Blocked {
63        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
64    },
65}
66
67/// Describes a connection between two peers.
68///
69/// Links are unidirectional (and must be set up in both directions
70/// for a bidirectional connection).
71#[derive(Clone)]
72pub struct Link {
73    /// Mean latency for the delivery of a message.
74    pub latency: Duration,
75
76    /// Standard deviation of the latency for the delivery of a message.
77    pub jitter: Duration,
78
79    /// Probability of a message being delivered successfully (in range \[0,1\]).
80    pub success_rate: f64,
81}
82
83/// Interface for modifying the simulated network.
84///
85/// At any point, peers can be added/removed and links
86/// between said peers can be modified.
87#[derive(Debug)]
88pub struct Oracle<P: PublicKey, E: Clock> {
89    sender: mpsc::UnboundedSender<Message<P, E>>,
90}
91
92impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
93    fn clone(&self) -> Self {
94        Self {
95            sender: self.sender.clone(),
96        }
97    }
98}
99
100impl<P: PublicKey, E: Clock> Oracle<P, E> {
101    /// Create a new instance of the oracle.
102    pub(crate) const fn new(sender: mpsc::UnboundedSender<Message<P, E>>) -> Self {
103        Self { sender }
104    }
105
106    /// Create a new [Control] interface for some peer.
107    pub fn control(&self, me: P) -> Control<P, E> {
108        Control {
109            me,
110            sender: self.sender.clone(),
111        }
112    }
113
114    /// Create a new [Manager].
115    ///
116    /// Useful for mocking [crate::authenticated::discovery].
117    pub fn manager(&self) -> Manager<P, E> {
118        Manager {
119            oracle: self.clone(),
120        }
121    }
122
123    /// Create a new [SocketManager].
124    ///
125    /// Useful for mocking [crate::authenticated::lookup].
126    pub fn socket_manager(&self) -> SocketManager<P, E> {
127        SocketManager {
128            oracle: self.clone(),
129        }
130    }
131
132    /// Return a list of all blocked peers.
133    pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
134        let (s, r) = oneshot::channel();
135        self.sender
136            .send(Message::Blocked { result: s })
137            .await
138            .map_err(|_| Error::NetworkClosed)?;
139        r.await.map_err(|_| Error::NetworkClosed)?
140    }
141
142    /// Set bandwidth limits for a peer.
143    ///
144    /// Bandwidth is specified for the peer's egress (upload) and ingress (download)
145    /// rates in bytes per second. Use `None` for unlimited bandwidth.
146    ///
147    /// Bandwidth can be specified before a peer is registered or linked.
148    pub async fn limit_bandwidth(
149        &mut self,
150        public_key: P,
151        egress_cap: Option<usize>,
152        ingress_cap: Option<usize>,
153    ) -> Result<(), Error> {
154        let (sender, receiver) = oneshot::channel();
155        self.sender
156            .send(Message::LimitBandwidth {
157                public_key,
158                egress_cap,
159                ingress_cap,
160                result: sender,
161            })
162            .await
163            .map_err(|_| Error::NetworkClosed)?;
164        receiver.await.map_err(|_| Error::NetworkClosed)
165    }
166
167    /// Create a unidirectional link between two peers.
168    ///
169    /// Link can be called multiple times for the same sender/receiver. The latest
170    /// setting will be used.
171    ///
172    /// Link can be called before a peer is registered or bandwidth is specified.
173    pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
174        // Sanity checks
175        if sender == receiver {
176            return Err(Error::LinkingSelf);
177        }
178        if config.success_rate < 0.0 || config.success_rate > 1.0 {
179            return Err(Error::InvalidSuccessRate(config.success_rate));
180        }
181
182        // Convert Duration to milliseconds as f64 for the Normal distribution
183        let latency_ms = config.latency.as_secs_f64() * 1000.0;
184        let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
185
186        // Create distribution
187        let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
188
189        // Wait for update to complete
190        let (s, r) = oneshot::channel();
191        self.sender
192            .send(Message::AddLink {
193                sender,
194                receiver,
195                sampler,
196                success_rate: config.success_rate,
197                result: s,
198            })
199            .await
200            .map_err(|_| Error::NetworkClosed)?;
201        r.await.map_err(|_| Error::NetworkClosed)?
202    }
203
204    /// Remove a unidirectional link between two peers.
205    ///
206    /// If no link exists, this will return an error.
207    pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
208        // Sanity checks
209        if sender == receiver {
210            return Err(Error::LinkingSelf);
211        }
212
213        // Wait for update to complete
214        let (s, r) = oneshot::channel();
215        self.sender
216            .send(Message::RemoveLink {
217                sender,
218                receiver,
219                result: s,
220            })
221            .await
222            .map_err(|_| Error::NetworkClosed)?;
223        r.await.map_err(|_| Error::NetworkClosed)?
224    }
225
226    /// Set the peers for a given id.
227    async fn update(&mut self, id: u64, peers: Set<P>) {
228        let _ = self.sender.send(Message::Update { id, peers }).await;
229    }
230
231    /// Get the peers for a given id.
232    async fn peer_set(&mut self, id: u64) -> Option<Set<P>> {
233        let (sender, receiver) = oneshot::channel();
234        self.sender
235            .send(Message::PeerSet {
236                id,
237                response: sender,
238            })
239            .await
240            .ok()?;
241        receiver.await.ok().flatten()
242    }
243
244    /// Subscribe to notifications when new peer sets are added.
245    async fn subscribe(&mut self) -> mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)> {
246        let (sender, receiver) = mpsc::unbounded();
247        let _ = self.sender.send(Message::Subscribe { sender }).await;
248        receiver
249    }
250}
251
252/// Implementation of [crate::Manager] for peers.
253///
254/// Useful for mocking [crate::authenticated::discovery].
255pub struct Manager<P: PublicKey, E: Clock> {
256    /// The oracle to send messages to.
257    oracle: Oracle<P, E>,
258}
259
260impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        f.debug_struct("Manager").finish_non_exhaustive()
263    }
264}
265
266impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
267    fn clone(&self) -> Self {
268        Self {
269            oracle: self.oracle.clone(),
270        }
271    }
272}
273
274impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
275    type PublicKey = P;
276    type Peers = Set<Self::PublicKey>;
277
278    async fn update(&mut self, id: u64, peers: Self::Peers) {
279        self.oracle.update(id, peers).await;
280    }
281
282    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
283        self.oracle.peer_set(id).await
284    }
285
286    async fn subscribe(
287        &mut self,
288    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
289        self.oracle.subscribe().await
290    }
291}
292
293/// Implementation of [crate::Manager] for peers with [Address]es.
294///
295/// Useful for mocking [crate::authenticated::lookup].
296///
297/// # Note on [Address]
298///
299/// Because addresses are never exposed in [crate::simulated],
300/// there is nothing to assert submitted data against. We thus consider
301/// all addresses to be valid.
302pub struct SocketManager<P: PublicKey, E: Clock> {
303    /// The oracle to send messages to.
304    oracle: Oracle<P, E>,
305}
306
307impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
308    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309        f.debug_struct("SocketManager").finish_non_exhaustive()
310    }
311}
312
313impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
314    fn clone(&self) -> Self {
315        Self {
316            oracle: self.oracle.clone(),
317        }
318    }
319}
320
321impl<P: PublicKey, E: Clock> crate::Manager for SocketManager<P, E> {
322    type PublicKey = P;
323    type Peers = Map<Self::PublicKey, Address>;
324
325    async fn update(&mut self, id: u64, peers: Self::Peers) {
326        // Ignore all addresses (simulated network doesn't use them)
327        self.oracle.update(id, peers.into_keys()).await;
328    }
329
330    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
331        self.oracle.peer_set(id).await
332    }
333
334    async fn subscribe(
335        &mut self,
336    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
337        self.oracle.subscribe().await
338    }
339}
340
341/// Individual control interface for a peer in the simulated network.
342#[derive(Debug)]
343pub struct Control<P: PublicKey, E: Clock> {
344    /// The public key of the peer this control interface is for.
345    me: P,
346
347    /// Sender for messages to the oracle.
348    sender: mpsc::UnboundedSender<Message<P, E>>,
349}
350
351impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
352    fn clone(&self) -> Self {
353        Self {
354            me: self.me.clone(),
355            sender: self.sender.clone(),
356        }
357    }
358}
359
360impl<P: PublicKey, E: Clock> Control<P, E> {
361    /// Register the communication interfaces for the peer over a given [Channel].
362    ///
363    /// # Rate Limiting
364    ///
365    /// The `quota` parameter specifies the rate limit for outbound messages to each peer.
366    /// Recipients that exceed their rate limit will be skipped when sending.
367    pub async fn register(
368        &mut self,
369        channel: Channel,
370        quota: Quota,
371    ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
372        let (tx, rx) = oneshot::channel();
373        self.sender
374            .send(Message::Register {
375                channel,
376                public_key: self.me.clone(),
377                quota,
378                result: tx,
379            })
380            .await
381            .map_err(|_| Error::NetworkClosed)?;
382        rx.await.map_err(|_| Error::NetworkClosed)?
383    }
384}
385
386impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
387    type PublicKey = P;
388
389    async fn block(&mut self, public_key: P) {
390        let _ = self
391            .sender
392            .send(Message::Block {
393                from: self.me.clone(),
394                to: public_key,
395            })
396            .await;
397    }
398}