commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::{authenticated::UnboundedMailbox, Address, Channel};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Clock, Quota};
5use commonware_utils::{
6    channels::{fallible::FallibleExt, ring},
7    ordered::{Map, Set},
8};
9use futures::channel::{mpsc, oneshot};
10use rand_distr::Normal;
11use std::time::Duration;
12
13pub enum Message<P: PublicKey, E: Clock> {
14    Register {
15        channel: Channel,
16        public_key: P,
17        quota: Quota,
18        #[allow(clippy::type_complexity)]
19        result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
20    },
21    Update {
22        id: u64,
23        peers: Set<P>,
24    },
25    PeerSet {
26        id: u64,
27        response: oneshot::Sender<Option<Set<P>>>,
28    },
29    Subscribe {
30        sender: mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>,
31    },
32    SubscribeConnected {
33        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
34    },
35    LimitBandwidth {
36        public_key: P,
37        egress_cap: Option<usize>,
38        ingress_cap: Option<usize>,
39        result: oneshot::Sender<()>,
40    },
41    AddLink {
42        sender: P,
43        receiver: P,
44        sampler: Normal<f64>,
45        success_rate: f64,
46        result: oneshot::Sender<Result<(), Error>>,
47    },
48    RemoveLink {
49        sender: P,
50        receiver: P,
51        result: oneshot::Sender<Result<(), Error>>,
52    },
53    Block {
54        /// The public key of the peer sending the block request.
55        from: P,
56        /// The public key of the peer to block.
57        to: P,
58    },
59    Blocked {
60        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
61    },
62}
63
64impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        match self {
67            Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
68            Self::Update { id, .. } => f
69                .debug_struct("Update")
70                .field("id", id)
71                .finish_non_exhaustive(),
72            Self::PeerSet { id, .. } => f
73                .debug_struct("PeerSet")
74                .field("id", id)
75                .finish_non_exhaustive(),
76            Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
77            Self::SubscribeConnected { .. } => {
78                f.debug_struct("SubscribeConnected").finish_non_exhaustive()
79            }
80            Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
81            Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
82            Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
83            Self::Block { from, to } => f
84                .debug_struct("Block")
85                .field("from", from)
86                .field("to", to)
87                .finish(),
88            Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
89        }
90    }
91}
92
93/// Describes a connection between two peers.
94///
95/// Links are unidirectional (and must be set up in both directions
96/// for a bidirectional connection).
97#[derive(Clone)]
98pub struct Link {
99    /// Mean latency for the delivery of a message.
100    pub latency: Duration,
101
102    /// Standard deviation of the latency for the delivery of a message.
103    pub jitter: Duration,
104
105    /// Probability of a message being delivered successfully (in range \[0,1\]).
106    pub success_rate: f64,
107}
108
109/// Interface for modifying the simulated network.
110///
111/// At any point, peers can be added/removed and links
112/// between said peers can be modified.
113#[derive(Debug)]
114pub struct Oracle<P: PublicKey, E: Clock> {
115    sender: UnboundedMailbox<Message<P, E>>,
116}
117
118impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
119    fn clone(&self) -> Self {
120        Self {
121            sender: self.sender.clone(),
122        }
123    }
124}
125
126impl<P: PublicKey, E: Clock> Oracle<P, E> {
127    /// Create a new instance of the oracle.
128    pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
129        Self { sender }
130    }
131
132    /// Create a new [Control] interface for some peer.
133    pub fn control(&self, me: P) -> Control<P, E> {
134        Control {
135            me,
136            sender: self.sender.clone(),
137        }
138    }
139
140    /// Create a new [Manager].
141    ///
142    /// Useful for mocking [crate::authenticated::discovery].
143    pub fn manager(&self) -> Manager<P, E> {
144        Manager {
145            oracle: self.clone(),
146        }
147    }
148
149    /// Create a new [SocketManager].
150    ///
151    /// Useful for mocking [crate::authenticated::lookup].
152    pub fn socket_manager(&self) -> SocketManager<P, E> {
153        SocketManager {
154            oracle: self.clone(),
155        }
156    }
157
158    /// Return a list of all blocked peers.
159    pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
160        self.sender
161            .0
162            .request(|result| Message::Blocked { result })
163            .await
164            .ok_or(Error::NetworkClosed)?
165    }
166
167    /// Set bandwidth limits for a peer.
168    ///
169    /// Bandwidth is specified for the peer's egress (upload) and ingress (download)
170    /// rates in bytes per second. Use `None` for unlimited bandwidth.
171    ///
172    /// Bandwidth can be specified before a peer is registered or linked.
173    pub async fn limit_bandwidth(
174        &self,
175        public_key: P,
176        egress_cap: Option<usize>,
177        ingress_cap: Option<usize>,
178    ) -> Result<(), Error> {
179        self.sender
180            .0
181            .request(|result| Message::LimitBandwidth {
182                public_key,
183                egress_cap,
184                ingress_cap,
185                result,
186            })
187            .await
188            .ok_or(Error::NetworkClosed)
189    }
190
191    /// Create a unidirectional link between two peers.
192    ///
193    /// Link can be called multiple times for the same sender/receiver. The latest
194    /// setting will be used.
195    ///
196    /// Link can be called before a peer is registered or bandwidth is specified.
197    pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
198        // Sanity checks
199        if sender == receiver {
200            return Err(Error::LinkingSelf);
201        }
202        if config.success_rate < 0.0 || config.success_rate > 1.0 {
203            return Err(Error::InvalidSuccessRate(config.success_rate));
204        }
205
206        // Convert Duration to milliseconds as f64 for the Normal distribution
207        let latency_ms = config.latency.as_secs_f64() * 1000.0;
208        let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
209
210        // Create distribution
211        let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
212
213        self.sender
214            .0
215            .request(|result| Message::AddLink {
216                sender,
217                receiver,
218                sampler,
219                success_rate: config.success_rate,
220                result,
221            })
222            .await
223            .ok_or(Error::NetworkClosed)?
224    }
225
226    /// Remove a unidirectional link between two peers.
227    ///
228    /// If no link exists, this will return an error.
229    pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
230        // Sanity checks
231        if sender == receiver {
232            return Err(Error::LinkingSelf);
233        }
234
235        self.sender
236            .0
237            .request(|result| Message::RemoveLink {
238                sender,
239                receiver,
240                result,
241            })
242            .await
243            .ok_or(Error::NetworkClosed)?
244    }
245
246    /// Set the peers for a given id.
247    async fn update(&self, id: u64, peers: Set<P>) {
248        self.sender.0.send_lossy(Message::Update { id, peers });
249    }
250
251    /// Get the peers for a given id.
252    async fn peer_set(&self, id: u64) -> Option<Set<P>> {
253        self.sender
254            .0
255            .request(|response| Message::PeerSet { id, response })
256            .await
257            .flatten()
258    }
259
260    /// Subscribe to notifications when new peer sets are added.
261    async fn subscribe(&self) -> mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)> {
262        let (sender, receiver) = mpsc::unbounded();
263        self.sender.0.send_lossy(Message::Subscribe { sender });
264        receiver
265    }
266}
267
268/// Implementation of [crate::Manager] for peers.
269///
270/// Useful for mocking [crate::authenticated::discovery].
271pub struct Manager<P: PublicKey, E: Clock> {
272    /// The oracle to send messages to.
273    oracle: Oracle<P, E>,
274}
275
276impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        f.debug_struct("Manager").finish_non_exhaustive()
279    }
280}
281
282impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
283    fn clone(&self) -> Self {
284        Self {
285            oracle: self.oracle.clone(),
286        }
287    }
288}
289
290impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
291    type PublicKey = P;
292    type Peers = Set<Self::PublicKey>;
293
294    async fn update(&mut self, id: u64, peers: Self::Peers) {
295        self.oracle.update(id, peers).await;
296    }
297
298    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
299        self.oracle.peer_set(id).await
300    }
301
302    async fn subscribe(
303        &mut self,
304    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
305        self.oracle.subscribe().await
306    }
307}
308
309/// Implementation of [crate::Manager] for peers with [Address]es.
310///
311/// Useful for mocking [crate::authenticated::lookup].
312///
313/// # Note on [Address]
314///
315/// Because addresses are never exposed in [crate::simulated],
316/// there is nothing to assert submitted data against. We thus consider
317/// all addresses to be valid.
318pub struct SocketManager<P: PublicKey, E: Clock> {
319    /// The oracle to send messages to.
320    oracle: Oracle<P, E>,
321}
322
323impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
324    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325        f.debug_struct("SocketManager").finish_non_exhaustive()
326    }
327}
328
329impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
330    fn clone(&self) -> Self {
331        Self {
332            oracle: self.oracle.clone(),
333        }
334    }
335}
336
337impl<P: PublicKey, E: Clock> crate::Manager for SocketManager<P, E> {
338    type PublicKey = P;
339    type Peers = Map<Self::PublicKey, Address>;
340
341    async fn update(&mut self, id: u64, peers: Self::Peers) {
342        // Ignore all addresses (simulated network doesn't use them)
343        self.oracle.update(id, peers.into_keys()).await;
344    }
345
346    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
347        self.oracle.peer_set(id).await
348    }
349
350    async fn subscribe(
351        &mut self,
352    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
353        self.oracle.subscribe().await
354    }
355}
356
357/// Individual control interface for a peer in the simulated network.
358#[derive(Debug)]
359pub struct Control<P: PublicKey, E: Clock> {
360    /// The public key of the peer this control interface is for.
361    me: P,
362
363    /// Sender for messages to the oracle.
364    sender: UnboundedMailbox<Message<P, E>>,
365}
366
367impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
368    fn clone(&self) -> Self {
369        Self {
370            me: self.me.clone(),
371            sender: self.sender.clone(),
372        }
373    }
374}
375
376impl<P: PublicKey, E: Clock> Control<P, E> {
377    /// Register the communication interfaces for the peer over a given [Channel].
378    ///
379    /// # Rate Limiting
380    ///
381    /// The `quota` parameter specifies the rate limit for outbound messages to each peer.
382    /// Recipients that exceed their rate limit will be skipped when sending.
383    pub async fn register(
384        &self,
385        channel: Channel,
386        quota: Quota,
387    ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
388        let public_key = self.me.clone();
389        self.sender
390            .0
391            .request(|result| Message::Register {
392                channel,
393                public_key,
394                quota,
395                result,
396            })
397            .await
398            .ok_or(Error::NetworkClosed)?
399    }
400}
401
402impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
403    type PublicKey = P;
404
405    async fn block(&mut self, public_key: P) {
406        self.sender.0.send_lossy(Message::Block {
407            from: self.me.clone(),
408            to: public_key,
409        });
410    }
411}