Skip to main content

commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::{
3    authenticated::UnboundedMailbox, Address, AddressableTrackedPeers, Channel,
4    PeerSetSubscription, TrackedPeers,
5};
6use commonware_cryptography::PublicKey;
7use commonware_runtime::{Clock, Quota};
8use commonware_utils::{
9    channel::{fallible::FallibleExt, mpsc, oneshot, ring},
10    ordered::Map,
11};
12use rand_distr::Normal;
13use std::time::Duration;
14
15pub enum Message<P: PublicKey, E: Clock> {
16    Register {
17        channel: Channel,
18        public_key: P,
19        quota: Quota,
20        #[allow(clippy::type_complexity)]
21        result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
22    },
23    Track {
24        id: u64,
25        peers: TrackedPeers<P>,
26    },
27    PeerSet {
28        id: u64,
29        response: oneshot::Sender<Option<TrackedPeers<P>>>,
30    },
31    Subscribe {
32        response: oneshot::Sender<PeerSetSubscription<P>>,
33    },
34    SubscribeConnected {
35        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
36    },
37    LimitBandwidth {
38        public_key: P,
39        egress_cap: Option<usize>,
40        ingress_cap: Option<usize>,
41        result: oneshot::Sender<()>,
42    },
43    AddLink {
44        sender: P,
45        receiver: P,
46        sampler: Normal<f64>,
47        success_rate: f64,
48        result: oneshot::Sender<Result<(), Error>>,
49    },
50    RemoveLink {
51        sender: P,
52        receiver: P,
53        result: oneshot::Sender<Result<(), Error>>,
54    },
55    Block {
56        /// The public key of the peer sending the block request.
57        from: P,
58        /// The public key of the peer to block.
59        to: P,
60    },
61    Blocked {
62        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
63    },
64}
65
66impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
70            Self::Track { id, .. } => f
71                .debug_struct("Track")
72                .field("id", id)
73                .finish_non_exhaustive(),
74            Self::PeerSet { id, .. } => f
75                .debug_struct("PeerSet")
76                .field("id", id)
77                .finish_non_exhaustive(),
78            Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
79            Self::SubscribeConnected { .. } => {
80                f.debug_struct("SubscribeConnected").finish_non_exhaustive()
81            }
82            Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
83            Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
84            Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
85            Self::Block { from, to } => f
86                .debug_struct("Block")
87                .field("from", from)
88                .field("to", to)
89                .finish(),
90            Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
91        }
92    }
93}
94
95/// Describes a connection between two peers.
96///
97/// Links are unidirectional (and must be set up in both directions
98/// for a bidirectional connection).
99#[derive(Clone)]
100pub struct Link {
101    /// Mean latency for the delivery of a message.
102    pub latency: Duration,
103
104    /// Standard deviation of the latency for the delivery of a message.
105    pub jitter: Duration,
106
107    /// Probability of a message being delivered successfully (in range \[0,1\]).
108    pub success_rate: f64,
109}
110
111/// Interface for modifying the simulated network.
112///
113/// At any point, peers can be added/removed and links
114/// between said peers can be modified.
115#[derive(Debug)]
116pub struct Oracle<P: PublicKey, E: Clock> {
117    sender: UnboundedMailbox<Message<P, E>>,
118}
119
120impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
121    fn clone(&self) -> Self {
122        Self {
123            sender: self.sender.clone(),
124        }
125    }
126}
127
128impl<P: PublicKey, E: Clock> Oracle<P, E> {
129    /// Create a new instance of the oracle.
130    pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
131        Self { sender }
132    }
133
134    /// Create a new [Control] interface for some peer.
135    pub fn control(&self, me: P) -> Control<P, E> {
136        Control {
137            me,
138            sender: self.sender.clone(),
139        }
140    }
141
142    /// Create a new [Manager].
143    ///
144    /// Useful for mocking [crate::authenticated::discovery].
145    pub fn manager(&self) -> Manager<P, E> {
146        Manager {
147            oracle: self.clone(),
148        }
149    }
150
151    /// Create a new [SocketManager].
152    ///
153    /// Useful for mocking [crate::authenticated::lookup].
154    pub fn socket_manager(&self) -> SocketManager<P, E> {
155        SocketManager {
156            oracle: self.clone(),
157        }
158    }
159
160    /// Return a list of all blocked peers.
161    pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
162        self.sender
163            .0
164            .request(|result| Message::Blocked { result })
165            .await
166            .ok_or(Error::NetworkClosed)?
167    }
168
169    /// Set bandwidth limits for a peer.
170    ///
171    /// Bandwidth is specified for the peer's egress (upload) and ingress (download)
172    /// rates in bytes per second. Use `None` for unlimited bandwidth.
173    ///
174    /// Bandwidth can be specified before a peer is registered or linked.
175    pub async fn limit_bandwidth(
176        &self,
177        public_key: P,
178        egress_cap: Option<usize>,
179        ingress_cap: Option<usize>,
180    ) -> Result<(), Error> {
181        self.sender
182            .0
183            .request(|result| Message::LimitBandwidth {
184                public_key,
185                egress_cap,
186                ingress_cap,
187                result,
188            })
189            .await
190            .ok_or(Error::NetworkClosed)
191    }
192
193    /// Create a unidirectional link between two peers.
194    ///
195    /// Link can be called multiple times for the same sender/receiver. The latest
196    /// setting will be used.
197    ///
198    /// Link can be called before a peer is registered or bandwidth is specified.
199    pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
200        // Sanity checks
201        if sender == receiver {
202            return Err(Error::LinkingSelf);
203        }
204        if config.success_rate < 0.0 || config.success_rate > 1.0 {
205            return Err(Error::InvalidSuccessRate(config.success_rate));
206        }
207
208        // Convert Duration to milliseconds as f64 for the Normal distribution
209        let latency_ms = config.latency.as_secs_f64() * 1000.0;
210        let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
211
212        // Create distribution
213        let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
214
215        self.sender
216            .0
217            .request(|result| Message::AddLink {
218                sender,
219                receiver,
220                sampler,
221                success_rate: config.success_rate,
222                result,
223            })
224            .await
225            .ok_or(Error::NetworkClosed)?
226    }
227
228    /// Remove a unidirectional link between two peers.
229    ///
230    /// If no link exists, this will return an error.
231    pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
232        // Sanity checks
233        if sender == receiver {
234            return Err(Error::LinkingSelf);
235        }
236
237        self.sender
238            .0
239            .request(|result| Message::RemoveLink {
240                sender,
241                receiver,
242                result,
243            })
244            .await
245            .ok_or(Error::NetworkClosed)?
246    }
247
248    /// Set the peers for a given id.
249    fn track(&self, id: u64, peers: TrackedPeers<P>) {
250        self.sender.0.send_lossy(Message::Track { id, peers });
251    }
252
253    /// Get the primary and secondary peers for a given ID.
254    async fn peer_set(&self, id: u64) -> Option<TrackedPeers<P>> {
255        self.sender
256            .0
257            .request(|response| Message::PeerSet { id, response })
258            .await
259            .flatten()
260    }
261
262    /// Subscribe to notifications when new peer sets are added.
263    async fn subscribe(&self) -> PeerSetSubscription<P> {
264        self.sender
265            .0
266            .request(|response| Message::Subscribe { response })
267            .await
268            .unwrap_or_else(|| {
269                let (_, rx) = mpsc::unbounded_channel();
270                rx
271            })
272    }
273}
274
275/// Implementation of [crate::Manager] for peers.
276///
277/// Useful for mocking [crate::authenticated::discovery].
278pub struct Manager<P: PublicKey, E: Clock> {
279    /// The oracle to send messages to.
280    oracle: Oracle<P, E>,
281}
282
283impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
284    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285        f.debug_struct("Manager").finish_non_exhaustive()
286    }
287}
288
289impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
290    fn clone(&self) -> Self {
291        Self {
292            oracle: self.oracle.clone(),
293        }
294    }
295}
296
297impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
298    type PublicKey = P;
299
300    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
301        self.oracle.peer_set(id).await
302    }
303
304    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
305        self.oracle.subscribe().await
306    }
307}
308
309impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
310    async fn track<R>(&mut self, id: u64, peers: R)
311    where
312        R: Into<TrackedPeers<Self::PublicKey>> + Send,
313    {
314        self.oracle.track(id, peers.into());
315    }
316}
317
318/// Implementation of [crate::AddressableManager] for peers with [Address]es.
319///
320/// Useful for mocking [crate::authenticated::lookup].
321///
322/// # Note on [Address]
323///
324/// Because addresses are never exposed in [crate::simulated],
325/// there is nothing to assert submitted data against. We thus consider
326/// all addresses to be valid.
327pub struct SocketManager<P: PublicKey, E: Clock> {
328    /// The oracle to send messages to.
329    oracle: Oracle<P, E>,
330}
331
332impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        f.debug_struct("SocketManager").finish_non_exhaustive()
335    }
336}
337
338impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
339    fn clone(&self) -> Self {
340        Self {
341            oracle: self.oracle.clone(),
342        }
343    }
344}
345
346impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
347    type PublicKey = P;
348
349    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
350        self.oracle.peer_set(id).await
351    }
352
353    async fn subscribe(&mut self) -> PeerSetSubscription<P> {
354        self.oracle.subscribe().await
355    }
356}
357
358impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
359    async fn track<R>(&mut self, id: u64, peers: R)
360    where
361        R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
362    {
363        // Ignore all addresses (simulated network doesn't use them)
364        let peers = peers.into();
365        self.oracle.track(
366            id,
367            TrackedPeers::new(peers.primary.into_keys(), peers.secondary.into_keys()),
368        );
369    }
370
371    async fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) {
372        // We consider all addresses to be valid, so this is a no-op
373    }
374}
375
376/// Individual control interface for a peer in the simulated network.
377#[derive(Debug)]
378pub struct Control<P: PublicKey, E: Clock> {
379    /// The public key of the peer this control interface is for.
380    me: P,
381
382    /// Sender for messages to the oracle.
383    sender: UnboundedMailbox<Message<P, E>>,
384}
385
386impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
387    fn clone(&self) -> Self {
388        Self {
389            me: self.me.clone(),
390            sender: self.sender.clone(),
391        }
392    }
393}
394
395impl<P: PublicKey, E: Clock> Control<P, E> {
396    /// Register the communication interfaces for the peer over a given [Channel].
397    ///
398    /// # Rate Limiting
399    ///
400    /// The `quota` parameter specifies the rate limit for outbound messages to each peer.
401    /// Recipients that exceed their rate limit will be skipped when sending.
402    pub async fn register(
403        &self,
404        channel: Channel,
405        quota: Quota,
406    ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
407        let public_key = self.me.clone();
408        self.sender
409            .0
410            .request(|result| Message::Register {
411                channel,
412                public_key,
413                quota,
414                result,
415            })
416            .await
417            .ok_or(Error::NetworkClosed)?
418    }
419}
420
421impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
422    type PublicKey = P;
423
424    async fn block(&mut self, public_key: P) {
425        self.sender.0.send_lossy(Message::Block {
426            from: self.me.clone(),
427            to: public_key,
428        });
429    }
430}