Skip to main content

commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::{authenticated::UnboundedMailbox, Address, Channel};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Clock, Quota};
5use commonware_utils::{
6    channel::{fallible::FallibleExt, mpsc, oneshot, ring},
7    ordered::{Map, Set},
8};
9use rand_distr::Normal;
10use std::time::Duration;
11
12pub enum Message<P: PublicKey, E: Clock> {
13    Register {
14        channel: Channel,
15        public_key: P,
16        quota: Quota,
17        #[allow(clippy::type_complexity)]
18        result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
19    },
20    Track {
21        id: u64,
22        peers: Set<P>,
23    },
24    PeerSet {
25        id: u64,
26        response: oneshot::Sender<Option<Set<P>>>,
27    },
28    Subscribe {
29        sender: mpsc::UnboundedSender<(u64, Set<P>, Set<P>)>,
30    },
31    SubscribeConnected {
32        response: oneshot::Sender<ring::Receiver<Vec<P>>>,
33    },
34    LimitBandwidth {
35        public_key: P,
36        egress_cap: Option<usize>,
37        ingress_cap: Option<usize>,
38        result: oneshot::Sender<()>,
39    },
40    AddLink {
41        sender: P,
42        receiver: P,
43        sampler: Normal<f64>,
44        success_rate: f64,
45        result: oneshot::Sender<Result<(), Error>>,
46    },
47    RemoveLink {
48        sender: P,
49        receiver: P,
50        result: oneshot::Sender<Result<(), Error>>,
51    },
52    Block {
53        /// The public key of the peer sending the block request.
54        from: P,
55        /// The public key of the peer to block.
56        to: P,
57    },
58    Blocked {
59        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
60    },
61}
62
63impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
67            Self::Track { id, .. } => f
68                .debug_struct("Track")
69                .field("id", id)
70                .finish_non_exhaustive(),
71            Self::PeerSet { id, .. } => f
72                .debug_struct("PeerSet")
73                .field("id", id)
74                .finish_non_exhaustive(),
75            Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
76            Self::SubscribeConnected { .. } => {
77                f.debug_struct("SubscribeConnected").finish_non_exhaustive()
78            }
79            Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
80            Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
81            Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
82            Self::Block { from, to } => f
83                .debug_struct("Block")
84                .field("from", from)
85                .field("to", to)
86                .finish(),
87            Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
88        }
89    }
90}
91
92/// Describes a connection between two peers.
93///
94/// Links are unidirectional (and must be set up in both directions
95/// for a bidirectional connection).
96#[derive(Clone)]
97pub struct Link {
98    /// Mean latency for the delivery of a message.
99    pub latency: Duration,
100
101    /// Standard deviation of the latency for the delivery of a message.
102    pub jitter: Duration,
103
104    /// Probability of a message being delivered successfully (in range \[0,1\]).
105    pub success_rate: f64,
106}
107
108/// Interface for modifying the simulated network.
109///
110/// At any point, peers can be added/removed and links
111/// between said peers can be modified.
112#[derive(Debug)]
113pub struct Oracle<P: PublicKey, E: Clock> {
114    sender: UnboundedMailbox<Message<P, E>>,
115}
116
117impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
118    fn clone(&self) -> Self {
119        Self {
120            sender: self.sender.clone(),
121        }
122    }
123}
124
125impl<P: PublicKey, E: Clock> Oracle<P, E> {
126    /// Create a new instance of the oracle.
127    pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
128        Self { sender }
129    }
130
131    /// Create a new [Control] interface for some peer.
132    pub fn control(&self, me: P) -> Control<P, E> {
133        Control {
134            me,
135            sender: self.sender.clone(),
136        }
137    }
138
139    /// Create a new [Manager].
140    ///
141    /// Useful for mocking [crate::authenticated::discovery].
142    pub fn manager(&self) -> Manager<P, E> {
143        Manager {
144            oracle: self.clone(),
145        }
146    }
147
148    /// Create a new [SocketManager].
149    ///
150    /// Useful for mocking [crate::authenticated::lookup].
151    pub fn socket_manager(&self) -> SocketManager<P, E> {
152        SocketManager {
153            oracle: self.clone(),
154        }
155    }
156
157    /// Return a list of all blocked peers.
158    pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
159        self.sender
160            .0
161            .request(|result| Message::Blocked { result })
162            .await
163            .ok_or(Error::NetworkClosed)?
164    }
165
166    /// Set bandwidth limits for a peer.
167    ///
168    /// Bandwidth is specified for the peer's egress (upload) and ingress (download)
169    /// rates in bytes per second. Use `None` for unlimited bandwidth.
170    ///
171    /// Bandwidth can be specified before a peer is registered or linked.
172    pub async fn limit_bandwidth(
173        &self,
174        public_key: P,
175        egress_cap: Option<usize>,
176        ingress_cap: Option<usize>,
177    ) -> Result<(), Error> {
178        self.sender
179            .0
180            .request(|result| Message::LimitBandwidth {
181                public_key,
182                egress_cap,
183                ingress_cap,
184                result,
185            })
186            .await
187            .ok_or(Error::NetworkClosed)
188    }
189
190    /// Create a unidirectional link between two peers.
191    ///
192    /// Link can be called multiple times for the same sender/receiver. The latest
193    /// setting will be used.
194    ///
195    /// Link can be called before a peer is registered or bandwidth is specified.
196    pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
197        // Sanity checks
198        if sender == receiver {
199            return Err(Error::LinkingSelf);
200        }
201        if config.success_rate < 0.0 || config.success_rate > 1.0 {
202            return Err(Error::InvalidSuccessRate(config.success_rate));
203        }
204
205        // Convert Duration to milliseconds as f64 for the Normal distribution
206        let latency_ms = config.latency.as_secs_f64() * 1000.0;
207        let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
208
209        // Create distribution
210        let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
211
212        self.sender
213            .0
214            .request(|result| Message::AddLink {
215                sender,
216                receiver,
217                sampler,
218                success_rate: config.success_rate,
219                result,
220            })
221            .await
222            .ok_or(Error::NetworkClosed)?
223    }
224
225    /// Remove a unidirectional link between two peers.
226    ///
227    /// If no link exists, this will return an error.
228    pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
229        // Sanity checks
230        if sender == receiver {
231            return Err(Error::LinkingSelf);
232        }
233
234        self.sender
235            .0
236            .request(|result| Message::RemoveLink {
237                sender,
238                receiver,
239                result,
240            })
241            .await
242            .ok_or(Error::NetworkClosed)?
243    }
244
245    /// Set the peers for a given id.
246    async fn track(&self, id: u64, peers: Set<P>) {
247        self.sender.0.send_lossy(Message::Track { id, peers });
248    }
249
250    /// Get the peers for a given id.
251    async fn peer_set(&self, id: u64) -> Option<Set<P>> {
252        self.sender
253            .0
254            .request(|response| Message::PeerSet { id, response })
255            .await
256            .flatten()
257    }
258
259    /// Subscribe to notifications when new peer sets are added.
260    async fn subscribe(&self) -> mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)> {
261        let (sender, receiver) = mpsc::unbounded_channel();
262        self.sender.0.send_lossy(Message::Subscribe { sender });
263        receiver
264    }
265}
266
267/// Implementation of [crate::Manager] for peers.
268///
269/// Useful for mocking [crate::authenticated::discovery].
270pub struct Manager<P: PublicKey, E: Clock> {
271    /// The oracle to send messages to.
272    oracle: Oracle<P, E>,
273}
274
275impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277        f.debug_struct("Manager").finish_non_exhaustive()
278    }
279}
280
281impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
282    fn clone(&self) -> Self {
283        Self {
284            oracle: self.oracle.clone(),
285        }
286    }
287}
288
289impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
290    type PublicKey = P;
291
292    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
293        self.oracle.peer_set(id).await
294    }
295
296    async fn subscribe(
297        &mut self,
298    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
299        self.oracle.subscribe().await
300    }
301}
302
303impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
304    async fn track(&mut self, id: u64, peers: Set<Self::PublicKey>) {
305        self.oracle.track(id, peers).await;
306    }
307}
308
309/// Implementation of [crate::AddressableManager] for peers with [Address]es.
310///
311/// Useful for mocking [crate::authenticated::lookup].
312///
313/// # Note on [Address]
314///
315/// Because addresses are never exposed in [crate::simulated],
316/// there is nothing to assert submitted data against. We thus consider
317/// all addresses to be valid.
318pub struct SocketManager<P: PublicKey, E: Clock> {
319    /// The oracle to send messages to.
320    oracle: Oracle<P, E>,
321}
322
323impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
324    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
325        f.debug_struct("SocketManager").finish_non_exhaustive()
326    }
327}
328
329impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
330    fn clone(&self) -> Self {
331        Self {
332            oracle: self.oracle.clone(),
333        }
334    }
335}
336
337impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
338    type PublicKey = P;
339
340    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
341        self.oracle.peer_set(id).await
342    }
343
344    async fn subscribe(
345        &mut self,
346    ) -> mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)> {
347        self.oracle.subscribe().await
348    }
349}
350
351impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
352    async fn track(&mut self, id: u64, peers: Map<Self::PublicKey, Address>) {
353        // Ignore all addresses (simulated network doesn't use them)
354        self.oracle.track(id, peers.into_keys()).await;
355    }
356
357    async fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) {
358        // We consider all addresses to be valid, so this is a no-op
359    }
360}
361
362/// Individual control interface for a peer in the simulated network.
363#[derive(Debug)]
364pub struct Control<P: PublicKey, E: Clock> {
365    /// The public key of the peer this control interface is for.
366    me: P,
367
368    /// Sender for messages to the oracle.
369    sender: UnboundedMailbox<Message<P, E>>,
370}
371
372impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
373    fn clone(&self) -> Self {
374        Self {
375            me: self.me.clone(),
376            sender: self.sender.clone(),
377        }
378    }
379}
380
381impl<P: PublicKey, E: Clock> Control<P, E> {
382    /// Register the communication interfaces for the peer over a given [Channel].
383    ///
384    /// # Rate Limiting
385    ///
386    /// The `quota` parameter specifies the rate limit for outbound messages to each peer.
387    /// Recipients that exceed their rate limit will be skipped when sending.
388    pub async fn register(
389        &self,
390        channel: Channel,
391        quota: Quota,
392    ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
393        let public_key = self.me.clone();
394        self.sender
395            .0
396            .request(|result| Message::Register {
397                channel,
398                public_key,
399                quota,
400                result,
401            })
402            .await
403            .ok_or(Error::NetworkClosed)?
404    }
405}
406
407impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
408    type PublicKey = P;
409
410    async fn block(&mut self, public_key: P) {
411        self.sender.0.send_lossy(Message::Block {
412            from: self.me.clone(),
413            to: public_key,
414        });
415    }
416}