commonware_p2p/simulated/
ingress.rs

1use super::{Error, Receiver, Sender};
2use crate::Channel;
3use commonware_utils::Array;
4use futures::{
5    channel::{mpsc, oneshot},
6    SinkExt,
7};
8use rand_distr::Normal;
9
10pub enum Message<P: Array> {
11    Register {
12        public_key: P,
13        channel: Channel,
14        #[allow(clippy::type_complexity)]
15        result: oneshot::Sender<Result<(Sender<P>, Receiver<P>), Error>>,
16    },
17    AddLink {
18        sender: P,
19        receiver: P,
20        sampler: Normal<f64>,
21        success_rate: f64,
22        result: oneshot::Sender<Result<(), Error>>,
23    },
24    RemoveLink {
25        sender: P,
26        receiver: P,
27        result: oneshot::Sender<Result<(), Error>>,
28    },
29    Block {
30        /// The public key of the peer sending the block request.
31        from: P,
32        /// The public key of the peer to block.
33        to: P,
34    },
35    Blocked {
36        result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
37    },
38}
39
40/// Describes a connection between two peers.
41///
42/// Links are unidirectional (and must be set up in both directions
43/// for a bidirectional connection).
44#[derive(Clone)]
45pub struct Link {
46    /// Mean latency for the delivery of a message in milliseconds.
47    pub latency: f64,
48
49    /// Standard deviation of the latency for the delivery of a message in milliseconds.
50    pub jitter: f64,
51
52    /// Probability of a message being delivered successfully (in range \[0,1\]).
53    pub success_rate: f64,
54}
55
56/// Interface for modifying the simulated network.
57///
58/// At any point, peers can be added/removed and links
59/// between said peers can be modified.
60#[derive(Clone)]
61pub struct Oracle<P: Array> {
62    sender: mpsc::UnboundedSender<Message<P>>,
63}
64
65impl<P: Array> Oracle<P> {
66    /// Create a new instance of the oracle.
67    pub(crate) fn new(sender: mpsc::UnboundedSender<Message<P>>) -> Self {
68        Self { sender }
69    }
70
71    /// Spawn an individual control interface for a peer in the simulated network.
72    pub fn control(&self, me: P) -> Control<P> {
73        Control {
74            me,
75            sender: self.sender.clone(),
76        }
77    }
78
79    /// Return a list of all blocked peers.
80    pub async fn blocked(&mut self) -> Result<Vec<(P, P)>, Error> {
81        let (s, r) = oneshot::channel();
82        self.sender
83            .send(Message::Blocked { result: s })
84            .await
85            .map_err(|_| Error::NetworkClosed)?;
86        r.await.map_err(|_| Error::NetworkClosed)?
87    }
88
89    /// Register a new peer with the network that can interact over a given channel.
90    ///
91    /// By default, the peer will not be linked to any other peers. If a peer is already
92    /// registered on a given channel, it will return an error.
93    pub async fn register(
94        &mut self,
95        public_key: P,
96        channel: Channel,
97    ) -> Result<(Sender<P>, Receiver<P>), Error> {
98        let (sender, receiver) = oneshot::channel();
99        self.sender
100            .send(Message::Register {
101                public_key,
102                channel,
103                result: sender,
104            })
105            .await
106            .map_err(|_| Error::NetworkClosed)?;
107        receiver.await.map_err(|_| Error::NetworkClosed)?
108    }
109
110    /// Create a unidirectional link between two peers.
111    ///
112    /// Link can be called multiple times for the same sender/receiver. The latest
113    /// setting will be used.
114    pub async fn add_link(&mut self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
115        // Sanity checks
116        if sender == receiver {
117            return Err(Error::LinkingSelf);
118        }
119        if config.success_rate < 0.0 || config.success_rate > 1.0 {
120            return Err(Error::InvalidSuccessRate(config.success_rate));
121        }
122        if config.latency < 0.0 || config.jitter < 0.0 {
123            return Err(Error::InvalidBehavior(config.latency, config.jitter));
124        }
125
126        // Create distribution
127        let sampler = Normal::new(config.latency, config.jitter)
128            .map_err(|_| Error::InvalidBehavior(config.latency, config.jitter))?;
129
130        // Wait for update to complete
131        let (s, r) = oneshot::channel();
132        self.sender
133            .send(Message::AddLink {
134                sender,
135                receiver,
136                sampler,
137                success_rate: config.success_rate,
138                result: s,
139            })
140            .await
141            .map_err(|_| Error::NetworkClosed)?;
142        r.await.map_err(|_| Error::NetworkClosed)?
143    }
144
145    /// Remove a unidirectional link between two peers.
146    ///
147    /// If no link exists, this will return an error.
148    pub async fn remove_link(&mut self, sender: P, receiver: P) -> Result<(), Error> {
149        // Sanity checks
150        if sender == receiver {
151            return Err(Error::LinkingSelf);
152        }
153
154        // Wait for update to complete
155        let (s, r) = oneshot::channel();
156        self.sender
157            .send(Message::RemoveLink {
158                sender,
159                receiver,
160                result: s,
161            })
162            .await
163            .map_err(|_| Error::NetworkClosed)?;
164        r.await.map_err(|_| Error::NetworkClosed)?
165    }
166}
167
168/// Individual control interface for a peer in the simulated network.
169#[derive(Clone)]
170pub struct Control<P: Array> {
171    /// The public key of the peer this control interface is for.
172    me: P,
173
174    /// Sender for messages to the oracle.
175    sender: mpsc::UnboundedSender<Message<P>>,
176}
177
178impl<P: Array> crate::Blocker for Control<P> {
179    type PublicKey = P;
180
181    async fn block(&mut self, public_key: P) {
182        let _ = self
183            .sender
184            .send(Message::Block {
185                from: self.me.clone(),
186                to: public_key,
187            })
188            .await;
189    }
190}