Skip to main content

commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::{
3    Address, AddressableTrackedPeers, Channel, PeerSetSubscription, Recipients, TrackedPeers,
4};
5use commonware_actor::Feedback;
6use commonware_cryptography::PublicKey;
7use commonware_runtime::{Clock, IoBuf, Quota};
8use commonware_utils::{
9    channel::{fallible::FallibleExt, mpsc, oneshot, ring},
10    ordered::Map,
11};
12use rand_distr::Normal;
13use std::time::Duration;
14
15pub enum Message<P: PublicKey, E: Clock> {
16    Register {
17        channel: Channel,
18        public_key: P,
19        quota: Quota,
20        #[allow(clippy::type_complexity)]
21        result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
22    },
23    Send {
24        channel: Channel,
25        origin: P,
26        recipients: Recipients<P>,
27        message: IoBuf,
28        priority: bool,
29    },
30    Track {
31        id: u64,
32        peers: TrackedPeers<P>,
33    },
34    PeerSet {
35        id: u64,
36        response: oneshot::Sender<Option<TrackedPeers<P>>>,
37    },
38    Subscribe {
39        response: oneshot::Sender<PeerSetSubscription<P>>,
40    },
41    SubscribePeers {
42        exclude: P,
43        sender: ring::Sender<Vec<P>>,
44    },
45    LimitBandwidth {
46        public_key: P,
47        egress_cap: Option<usize>,
48        ingress_cap: Option<usize>,
49        result: oneshot::Sender<()>,
50    },
51    AddLink {
52        sender: P,
53        receiver: P,
54        sampler: Normal<f64>,
55        success_rate: f64,
56        result: oneshot::Sender<Result<(), Error>>,
57    },
58    RemoveLink {
59        sender: P,
60        receiver: P,
61        result: oneshot::Sender<Result<(), Error>>,
62    },
63    Block {
64        /// The public key of the peer sending the block request.
65        from: P,
66        /// The public key of the peer to block.
67        to: P,
68    },
69    Blocked {
70        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
71    },
72}
73
74impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        match self {
77            Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
78            Self::Send { .. } => f.debug_struct("Send").finish_non_exhaustive(),
79            Self::Track { id, .. } => f
80                .debug_struct("Track")
81                .field("id", id)
82                .finish_non_exhaustive(),
83            Self::PeerSet { id, .. } => f
84                .debug_struct("PeerSet")
85                .field("id", id)
86                .finish_non_exhaustive(),
87            Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
88            Self::SubscribePeers { exclude, .. } => f
89                .debug_struct("SubscribePeers")
90                .field("exclude", exclude)
91                .finish_non_exhaustive(),
92            Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
93            Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
94            Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
95            Self::Block { from, to } => f
96                .debug_struct("Block")
97                .field("from", from)
98                .field("to", to)
99                .finish(),
100            Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
101        }
102    }
103}
104
105fn enqueue<P: PublicKey, E: Clock>(
106    sender: &mpsc::UnboundedSender<Message<P, E>>,
107    message: Message<P, E>,
108) -> Feedback {
109    if sender.send_lossy(message) {
110        Feedback::Ok
111    } else {
112        Feedback::Closed
113    }
114}
115
116async fn request<P, E, R, F>(
117    sender: &mpsc::UnboundedSender<Message<P, E>>,
118    make_msg: F,
119) -> Option<R>
120where
121    P: PublicKey,
122    E: Clock,
123    R: Send,
124    F: FnOnce(oneshot::Sender<R>) -> Message<P, E> + Send,
125{
126    sender.request(make_msg).await
127}
128
129/// Describes a connection between two peers.
130///
131/// Links are unidirectional (and must be set up in both directions
132/// for a bidirectional connection).
133#[derive(Clone)]
134pub struct Link {
135    /// Mean latency for the delivery of a message.
136    pub latency: Duration,
137
138    /// Standard deviation of the latency for the delivery of a message.
139    pub jitter: Duration,
140
141    /// Probability of a message being delivered successfully (in range \[0,1\]).
142    pub success_rate: f64,
143}
144
145/// Interface for modifying the simulated network.
146///
147/// At any point, peers can be added/removed and links
148/// between said peers can be modified.
149#[derive(Debug)]
150pub struct Oracle<P: PublicKey, E: Clock> {
151    sender: mpsc::UnboundedSender<Message<P, E>>,
152}
153
154impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
155    fn clone(&self) -> Self {
156        Self {
157            sender: self.sender.clone(),
158        }
159    }
160}
161
162impl<P: PublicKey, E: Clock> Oracle<P, E> {
163    /// Create a new instance of the oracle.
164    pub(crate) const fn new(sender: mpsc::UnboundedSender<Message<P, E>>) -> Self {
165        Self { sender }
166    }
167
168    /// Create a new [Control] interface for some peer.
169    pub fn control(&self, me: P) -> Control<P, E> {
170        Control {
171            me,
172            sender: self.sender.clone(),
173        }
174    }
175
176    /// Create a new [Manager].
177    ///
178    /// Useful for mocking [crate::authenticated::discovery].
179    pub fn manager(&self) -> Manager<P, E> {
180        Manager {
181            oracle: self.clone(),
182        }
183    }
184
185    /// Create a new [SocketManager].
186    ///
187    /// Useful for mocking [crate::authenticated::lookup].
188    pub fn socket_manager(&self) -> SocketManager<P, E> {
189        SocketManager {
190            oracle: self.clone(),
191        }
192    }
193
194    /// Return a list of all blocked peers.
195    pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
196        request(&self.sender, |result| Message::Blocked { result })
197            .await
198            .ok_or(Error::NetworkClosed)?
199    }
200
201    /// Set bandwidth limits for a peer.
202    ///
203    /// Bandwidth is specified for the peer's egress (upload) and ingress (download)
204    /// rates in bytes per second. Use `None` for unlimited bandwidth.
205    ///
206    /// Bandwidth can be specified before a peer is registered or linked.
207    pub async fn limit_bandwidth(
208        &self,
209        public_key: P,
210        egress_cap: Option<usize>,
211        ingress_cap: Option<usize>,
212    ) -> Result<(), Error> {
213        request(&self.sender, move |result| Message::LimitBandwidth {
214            public_key,
215            egress_cap,
216            ingress_cap,
217            result,
218        })
219        .await
220        .ok_or(Error::NetworkClosed)
221    }
222
223    /// Create a unidirectional link between two peers.
224    ///
225    /// Link can be called multiple times for the same sender/receiver. The latest
226    /// setting will be used.
227    ///
228    /// Link can be called before a peer is registered or bandwidth is specified.
229    pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
230        // Sanity checks
231        if sender == receiver {
232            return Err(Error::LinkingSelf);
233        }
234        if config.success_rate < 0.0 || config.success_rate > 1.0 {
235            return Err(Error::InvalidSuccessRate(config.success_rate));
236        }
237
238        // Convert Duration to milliseconds as f64 for the Normal distribution
239        let latency_ms = config.latency.as_secs_f64() * 1000.0;
240        let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
241
242        // Create distribution
243        let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
244
245        request(&self.sender, move |result| Message::AddLink {
246            sender,
247            receiver,
248            sampler,
249            success_rate: config.success_rate,
250            result,
251        })
252        .await
253        .ok_or(Error::NetworkClosed)?
254    }
255
256    /// Remove a unidirectional link between two peers.
257    ///
258    /// If no link exists, this will return an error.
259    pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
260        // Sanity checks
261        if sender == receiver {
262            return Err(Error::LinkingSelf);
263        }
264
265        request(&self.sender, move |result| Message::RemoveLink {
266            sender,
267            receiver,
268            result,
269        })
270        .await
271        .ok_or(Error::NetworkClosed)?
272    }
273
274    /// Set the peers for a given id.
275    fn track(&self, id: u64, peers: TrackedPeers<P>) -> Feedback {
276        enqueue(&self.sender, Message::Track { id, peers })
277    }
278
279    /// Get the primary and secondary peers for a given ID.
280    async fn peer_set(&self, id: u64) -> Option<TrackedPeers<P>> {
281        request(&self.sender, move |response| Message::PeerSet {
282            id,
283            response,
284        })
285        .await
286        .flatten()
287    }
288
289    /// Subscribe to notifications when new peer sets are added.
290    async fn subscribe(&self) -> PeerSetSubscription<P> {
291        request(&self.sender, |response| Message::Subscribe { response })
292            .await
293            .unwrap_or_else(|| {
294                let (_, rx) = mpsc::unbounded_channel();
295                rx
296            })
297    }
298}
299
300/// Implementation of [crate::Manager] for peers.
301///
302/// Useful for mocking [crate::authenticated::discovery].
303pub struct Manager<P: PublicKey, E: Clock> {
304    /// The oracle to send messages to.
305    oracle: Oracle<P, E>,
306}
307
308impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        f.debug_struct("Manager").finish_non_exhaustive()
311    }
312}
313
314impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
315    fn clone(&self) -> Self {
316        Self {
317            oracle: self.oracle.clone(),
318        }
319    }
320}
321
322impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
323    type PublicKey = P;
324
325    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
326        self.oracle.peer_set(id).await
327    }
328
329    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
330        self.oracle.subscribe().await
331    }
332}
333
334impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
335    fn track<R>(&mut self, id: u64, peers: R) -> Feedback
336    where
337        R: Into<TrackedPeers<Self::PublicKey>> + Send,
338    {
339        self.oracle.track(id, peers.into())
340    }
341}
342
343/// Implementation of [crate::AddressableManager] for peers with [Address]es.
344///
345/// Useful for mocking [crate::authenticated::lookup].
346///
347/// # Note on [Address]
348///
349/// Because addresses are never exposed in [crate::simulated],
350/// there is nothing to assert submitted data against. We thus consider
351/// all addresses to be valid.
352pub struct SocketManager<P: PublicKey, E: Clock> {
353    /// The oracle to send messages to.
354    oracle: Oracle<P, E>,
355}
356
357impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359        f.debug_struct("SocketManager").finish_non_exhaustive()
360    }
361}
362
363impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
364    fn clone(&self) -> Self {
365        Self {
366            oracle: self.oracle.clone(),
367        }
368    }
369}
370
371impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
372    type PublicKey = P;
373
374    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
375        self.oracle.peer_set(id).await
376    }
377
378    async fn subscribe(&mut self) -> PeerSetSubscription<P> {
379        self.oracle.subscribe().await
380    }
381}
382
383impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
384    fn track<R>(&mut self, id: u64, peers: R) -> Feedback
385    where
386        R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
387    {
388        // Ignore all addresses (simulated network doesn't use them)
389        let peers = peers.into();
390        self.oracle.track(
391            id,
392            TrackedPeers::new(peers.primary.into_keys(), peers.secondary.into_keys()),
393        )
394    }
395
396    fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) -> Feedback {
397        // We consider all addresses to be valid, so this is a no-op
398        Feedback::Ok
399    }
400}
401
402/// Individual control interface for a peer in the simulated network.
403#[derive(Debug)]
404pub struct Control<P: PublicKey, E: Clock> {
405    /// The public key of the peer this control interface is for.
406    me: P,
407
408    /// Sender for messages to the oracle.
409    sender: mpsc::UnboundedSender<Message<P, E>>,
410}
411
412impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
413    fn clone(&self) -> Self {
414        Self {
415            me: self.me.clone(),
416            sender: self.sender.clone(),
417        }
418    }
419}
420
421impl<P: PublicKey, E: Clock> Control<P, E> {
422    /// Register the communication interfaces for the peer over a given [Channel].
423    ///
424    /// # Rate Limiting
425    ///
426    /// The `quota` parameter specifies the rate limit for outbound messages to each peer.
427    /// Recipients that exceed their rate limit will be skipped when sending.
428    pub async fn register(
429        &self,
430        channel: Channel,
431        quota: Quota,
432    ) -> Result<(Sender<P, E>, Receiver<P>), Error> {
433        let public_key = self.me.clone();
434        request(&self.sender, move |result| Message::Register {
435            channel,
436            public_key,
437            quota,
438            result,
439        })
440        .await
441        .ok_or(Error::NetworkClosed)?
442    }
443}
444
445impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
446    type PublicKey = P;
447
448    fn block(&mut self, public_key: P) -> Feedback {
449        enqueue(
450            &self.sender,
451            Message::Block {
452                from: self.me.clone(),
453                to: public_key,
454            },
455        )
456    }
457}