commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_utils::Array;
4use futures::{
5    channel::{mpsc, oneshot},
6    SinkExt,
7};
8use rand_distr::Normal;
9
10pub enum Message<P: Array> {
11    Register {
12        public_key: P,
13        channel: Channel,
14        #[allow(clippy::type_complexity)]
15        result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
16    },
17    AddLink {
18        sender: P,
19        receiver: P,
20        sampler: Normal<f64>,
21        success_rate: f64,
22        result: oneshot::Sender<Result<(), Error>>,
23    },
24    RemoveLink {
25        sender: P,
26        receiver: P,
27        result: oneshot::Sender<Result<(), Error>>,
28    },
29}
30
31/// Describes a connection between two peers.
32///
33/// Links are unidirectional (and must be set up in both directions
34/// for a bidirectional connection).
35#[derive(Clone)]
36pub struct Link {
37    /// Mean latency for the delivery of a message in milliseconds.
38    pub latency: f64,
39
40    /// Standard deviation of the latency for the delivery of a message in milliseconds.
41    pub jitter: f64,
42
43    /// Probability of a message being delivered successfully (in range \[0,1\]).
44    pub success_rate: f64,
45}
46
47/// Interface for modifying the simulated network.
48///
49/// At any point, peers can be added/removed and links
50/// between said peers can be modified.
51#[derive(Clone)]
52pub struct Oracle<P: Array> {
53    sender: mpsc::UnboundedSender<Message<P>>,
54}
55
56impl<P: Array> Oracle<P> {
57    pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
58        Self { sender }
59    }
60
61    /// Register a new peer with the network that can interact over a given channel.
62    ///
63    /// By default, the peer will not be linked to any other peers. If a peer is already
64    /// registered on a given channel, it will return an error.
65    pub async fn register(
66        &mut self,
67        public_key: P,
68        channel: Channel,
69    ) -> Result<(Sender<P>, Receiver<P>), Error> {
70        let (sender, receiver) = oneshot::channel();
71        self.sender
72            .send(Message::Register {
73                public_key,
74                channel,
75                result: sender,
76            })
77            .await
78            .map_err(|_| Error::NetworkClosed)?;
79        receiver.await.map_err(|_| Error::NetworkClosed)?
80    }
81
82    /// Create a unidirectional link between two peers.
83    ///
84    /// Link can be called multiple times for the same sender/receiver. The latest
85    /// setting will be used.
86    pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
87        // Sanity checks
88        if sender == receiver {
89            return Err(Error::LinkingSelf);
90        }
91        if config.success_rate < 0.0 || config.success_rate > 1.0 {
92            return Err(Error::InvalidSuccessRate(config.success_rate));
93        }
94        if config.latency < 0.0 || config.jitter < 0.0 {
95            return Err(Error::InvalidBehavior(config.latency, config.jitter));
96        }
97
98        // Create distribution
99        let sampler = Normal::new(config.latency, config.jitter)
100            .map_err(|_| Error::InvalidBehavior(config.latency, config.jitter))?;
101
102        // Wait for update to complete
103        let (s, r) = oneshot::channel();
104        self.sender
105            .send(Message::AddLink {
106                sender,
107                receiver,
108                sampler,
109                success_rate: config.success_rate,
110                result: s,
111            })
112            .await
113            .map_err(|_| Error::NetworkClosed)?;
114        r.await.map_err(|_| Error::NetworkClosed)?
115    }
116
117    /// Remove a unidirectional link between two peers.
118    ///
119    /// If no link exists, this will return an error.
120    pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
121        // Sanity checks
122        if sender == receiver {
123            return Err(Error::LinkingSelf);
124        }
125
126        // Wait for update to complete
127        let (s, r) = oneshot::channel();
128        self.sender
129            .send(Message::RemoveLink {
130                sender,
131                receiver,
132                result: s,
133            })
134            .await
135            .map_err(|_| Error::NetworkClosed)?;
136        r.await.map_err(|_| Error::NetworkClosed)?
137    }
138}