commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_cryptography::PublicKey;
4use futures::{
5    channel::{mpsc, oneshot},
6    SinkExt,
7};
8use rand_distr::Normal;
9use std::time::Duration;
10
11pub enum Message<P: PublicKey> {
12    Register {
13        public_key: P,
14        channel: Channel,
15        #[allow(clippy::type_complexity)]
16        result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
17    },
18    SetBandwidth {
19        public_key: P,
20        egress_bps: usize,
21        ingress_bps: usize,
22        result: oneshot::Sender<Result<(), Error>>,
23    },
24    AddLink {
25        sender: P,
26        receiver: P,
27        sampler: Normal<f64>,
28        success_rate: f64,
29        result: oneshot::Sender<Result<(), Error>>,
30    },
31    RemoveLink {
32        sender: P,
33        receiver: P,
34        result: oneshot::Sender<Result<(), Error>>,
35    },
36    Block {
37        /// The public key of the peer sending the block request.
38        from: P,
39        /// The public key of the peer to block.
40        to: P,
41    },
42    Blocked {
43        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
44    },
45}
46
47/// Describes a connection between two peers.
48///
49/// Links are unidirectional (and must be set up in both directions
50/// for a bidirectional connection).
51#[derive(Clone)]
52pub struct Link {
53    /// Mean latency for the delivery of a message.
54    pub latency: Duration,
55
56    /// Standard deviation of the latency for the delivery of a message.
57    pub jitter: Duration,
58
59    /// Probability of a message being delivered successfully (in range \[0,1\]).
60    pub success_rate: f64,
61}
62
63/// Interface for modifying the simulated network.
64///
65/// At any point, peers can be added/removed and links
66/// between said peers can be modified.
67#[derive(Clone)]
68pub struct Oracle<P: PublicKey> {
69    sender: mpsc::UnboundedSender<Message<P>>,
70}
71
72impl<P: PublicKey> Oracle<P> {
73    /// Create a new instance of the oracle.
74    pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
75        Self { sender }
76    }
77
78    /// Spawn an individual control interface for a peer in the simulated network.
79    pub fn control(&self, me: P) -> Control<P> {
80        Control {
81            me,
82            sender: self.sender.clone(),
83        }
84    }
85
86    /// Return a list of all blocked peers.
87    pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
88        let (s, r) = oneshot::channel();
89        self.sender
90            .send(Message::Blocked { result: s })
91            .await
92            .map_err(|_| Error::NetworkClosed)?;
93        r.await.map_err(|_| Error::NetworkClosed)?
94    }
95
96    /// Register a new peer with the network that can interact over a given channel.
97    ///
98    /// By default, the peer will not be linked to any other peers. If a peer is already
99    /// registered on a given channel, it will return an error.
100    pub async fn register(
101        &mut self,
102        public_key: P,
103        channel: Channel,
104    ) -> Result<(Sender<P>, Receiver<P>), Error> {
105        let (sender, receiver) = oneshot::channel();
106        self.sender
107            .send(Message::Register {
108                public_key,
109                channel,
110                result: sender,
111            })
112            .await
113            .map_err(|_| Error::NetworkClosed)?;
114        receiver.await.map_err(|_| Error::NetworkClosed)?
115    }
116
117    /// Set bandwidth limits for a peer.
118    ///
119    /// Bandwidth is specified for the peer's egress (upload) and ingress (download)
120    /// rates in bytes per second. Use `usize::MAX` for unlimited bandwidth.
121    pub async fn set_bandwidth(
122        &mut self,
123        public_key: P,
124        egress_bps: usize,
125        ingress_bps: usize,
126    ) -> Result<(), Error> {
127        let (sender, receiver) = oneshot::channel();
128        self.sender
129            .send(Message::SetBandwidth {
130                public_key,
131                egress_bps,
132                ingress_bps,
133                result: sender,
134            })
135            .await
136            .map_err(|_| Error::NetworkClosed)?;
137        receiver.await.map_err(|_| Error::NetworkClosed)?
138    }
139
140    /// Create a unidirectional link between two peers.
141    ///
142    /// Link can be called multiple times for the same sender/receiver. The latest
143    /// setting will be used.
144    pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
145        // Sanity checks
146        if sender == receiver {
147            return Err(Error::LinkingSelf);
148        }
149        if config.success_rate < 0.0 || config.success_rate > 1.0 {
150            return Err(Error::InvalidSuccessRate(config.success_rate));
151        }
152
153        // Convert Duration to milliseconds as f64 for the Normal distribution
154        let latency_ms = config.latency.as_secs_f64() * 1000.0;
155        let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
156
157        // Create distribution
158        let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
159
160        // Wait for update to complete
161        let (s, r) = oneshot::channel();
162        self.sender
163            .send(Message::AddLink {
164                sender,
165                receiver,
166                sampler,
167                success_rate: config.success_rate,
168                result: s,
169            })
170            .await
171            .map_err(|_| Error::NetworkClosed)?;
172        r.await.map_err(|_| Error::NetworkClosed)?
173    }
174
175    /// Remove a unidirectional link between two peers.
176    ///
177    /// If no link exists, this will return an error.
178    pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
179        // Sanity checks
180        if sender == receiver {
181            return Err(Error::LinkingSelf);
182        }
183
184        // Wait for update to complete
185        let (s, r) = oneshot::channel();
186        self.sender
187            .send(Message::RemoveLink {
188                sender,
189                receiver,
190                result: s,
191            })
192            .await
193            .map_err(|_| Error::NetworkClosed)?;
194        r.await.map_err(|_| Error::NetworkClosed)?
195    }
196}
197
198/// Individual control interface for a peer in the simulated network.
199#[derive(Clone)]
200pub struct Control<P: PublicKey> {
201    /// The public key of the peer this control interface is for.
202    me: P,
203
204    /// Sender for messages to the oracle.
205    sender: mpsc::UnboundedSender<Message<P>>,
206}
207
208impl<P: PublicKey> crate::Blocker for Control<P> {
209    type PublicKey = P;
210
211    async fn block(&mut self, public_key: P) {
212        let _ = self
213            .sender
214            .send(Message::Block {
215                from: self.me.clone(),
216                to: public_key,
217            })
218            .await;
219    }
220}