commonware_p2p/lib.rs
1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9 html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17 use commonware_cryptography::PublicKey;
18 use commonware_runtime::{IoBuf, IoBufMut};
19 use commonware_utils::{channel::mpsc, ordered::{Map, Set}};
20 use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
21
22 pub mod authenticated;
23 pub mod types;
24 pub mod utils;
25
26 pub use types::{Address, Ingress};
27
28 /// Tuple representing a message received from a given public key.
29 ///
30 /// This message is guaranteed to adhere to the configuration of the channel and
31 /// will already be decrypted and authenticated.
32 pub type Message<P> = (P, IoBuf);
33
34 /// Alias for identifying communication channels.
35 pub type Channel = u64;
36
37 /// Enum indicating the set of recipients to send a message to.
38 #[derive(Clone, Debug)]
39 pub enum Recipients<P: PublicKey> {
40 All,
41 Some(Vec<P>),
42 One(P),
43 }
44
45 /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
46 pub trait UnlimitedSender: Clone + Send + Sync + 'static {
47 /// Public key type used to identify recipients.
48 type PublicKey: PublicKey;
49
50 /// Error that can occur when sending a message.
51 type Error: Debug + StdError + Send + Sync + 'static;
52
53 /// Sends a message to a set of recipients.
54 ///
55 /// # Offline Recipients
56 ///
57 /// If a recipient is offline at the time a message is sent, the message
58 /// will be dropped. It is up to the application to handle retries (if
59 /// necessary).
60 ///
61 /// # Returns
62 ///
63 /// A vector of recipients that the message was sent to, or an error if the
64 /// message could not be sent due to a validation failure (e.g., too large).
65 ///
66 /// Note: a successful send does not guarantee that the recipient will
67 /// receive the message.
68 ///
69 /// # Graceful Shutdown
70 ///
71 /// Implementations must handle internal channel closures gracefully during
72 /// shutdown. If the underlying network is shutting down, this method should
73 /// return `Ok` (possibly with an empty or partial recipient list) rather
74 /// than an error. Errors should only be returned for validation failures
75 /// that the caller can act upon.
76 fn send(
77 &mut self,
78 recipients: Recipients<Self::PublicKey>,
79 message: impl Into<IoBufMut> + Send,
80 priority: bool,
81 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
82 }
83
84 /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
85 /// filtering out any that are currently rate-limited.
86 pub trait LimitedSender: Clone + Send + Sync + 'static {
87 /// Public key type used to identify recipients.
88 type PublicKey: PublicKey;
89
90 /// The type of [`CheckedSender`] returned after checking recipients.
91 type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
92 where
93 Self: 'a;
94
95 /// Checks which recipients are within their rate limit and returns a
96 /// [`CheckedSender`] for sending to them.
97 ///
98 /// # Rate Limiting
99 ///
100 /// Recipients that exceed their rate limit will be filtered out. The
101 /// returned [`CheckedSender`] will only send to non-limited recipients.
102 ///
103 /// # Returns
104 ///
105 /// A [`CheckedSender`] containing only the recipients that are not
106 /// currently rate-limited, or an error with the earliest instant at which
107 /// all recipients will be available if all are rate-limited.
108 fn check<'a>(
109 &'a mut self,
110 recipients: Recipients<Self::PublicKey>,
111 ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
112 }
113
114 /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
115 pub trait CheckedSender: Send {
116 /// Public key type used to identify [`Recipients`].
117 type PublicKey: PublicKey;
118
119 /// Error that can occur when sending a message.
120 type Error: Debug + StdError + Send + Sync + 'static;
121
122 /// Sends a message to the pre-checked recipients.
123 ///
124 /// # Offline Recipients
125 ///
126 /// If a recipient is offline at the time a message is sent, the message
127 /// will be dropped. It is up to the application to handle retries (if
128 /// necessary).
129 ///
130 /// # Returns
131 ///
132 /// A vector of recipients that the message was sent to, or an error if the
133 /// message could not be sent due to a validation failure (e.g., too large).
134 ///
135 /// Note: a successful send does not guarantee that the recipient will
136 /// receive the message.
137 ///
138 /// # Graceful Shutdown
139 ///
140 /// Implementations must handle internal channel closures gracefully during
141 /// shutdown. If the underlying network is shutting down, this method should
142 /// return `Ok` (possibly with an empty or partial recipient list) rather
143 /// than an error. Errors should only be returned for validation failures
144 /// that the caller can act upon.
145 fn send(
146 self,
147 message: impl Into<IoBufMut> + Send,
148 priority: bool,
149 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
150 }
151
152 /// Interface for sending messages to a set of recipients.
153 pub trait Sender: LimitedSender {
154 /// Sends a message to a set of recipients.
155 ///
156 /// # Offline Recipients
157 ///
158 /// If a recipient is offline at the time a message is sent, the message
159 /// will be dropped. It is up to the application to handle retries (if
160 /// necessary).
161 ///
162 /// # Rate Limiting
163 ///
164 /// Recipients that exceed their rate limit will be skipped. The message is
165 /// still sent to non-limited recipients. Check the returned vector to see
166 /// which peers were sent the message.
167 ///
168 /// # Returns
169 ///
170 /// A vector of recipients that the message was sent to, or an error if the
171 /// message could not be sent due to a validation failure (e.g., too large).
172 ///
173 /// Note: a successful send does not guarantee that the recipient will
174 /// receive the message.
175 ///
176 /// # Graceful Shutdown
177 ///
178 /// Implementations must handle internal channel closures gracefully during
179 /// shutdown. If the underlying network is shutting down, this method should
180 /// return `Ok` (possibly with an empty or partial recipient list) rather
181 /// than an error. Errors should only be returned for validation failures
182 /// that the caller can act upon.
183 fn send(
184 &mut self,
185 recipients: Recipients<Self::PublicKey>,
186 message: impl Into<IoBufMut> + Send,
187 priority: bool,
188 ) -> impl Future<
189 Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
190 > + Send {
191 async move {
192 match self.check(recipients).await {
193 Ok(checked_sender) => checked_sender.send(message, priority).await,
194 Err(_) => Ok(Vec::new()),
195 }
196 }
197 }
198 }
199
200 // Blanket implementation of `Sender` for all `LimitedSender`s.
201 impl<S: LimitedSender> Sender for S {}
202
203 /// Interface for receiving messages from arbitrary recipients.
204 pub trait Receiver: Debug + Send + 'static {
205 /// Error that can occur when receiving a message.
206 type Error: Debug + StdError + Send + Sync;
207
208 /// Public key type used to identify recipients.
209 type PublicKey: PublicKey;
210
211 /// Receive a message from an arbitrary recipient.
212 fn recv(
213 &mut self,
214 ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
215 }
216
217 /// Interface for reading peer set information.
218 pub trait Provider: Debug + Clone + Send + 'static {
219 /// Public key type used to identify peers.
220 type PublicKey: PublicKey;
221
222 /// Fetch the ordered set of peers for a given ID.
223 fn peer_set(
224 &mut self,
225 id: u64,
226 ) -> impl Future<Output = Option<Set<Self::PublicKey>>> + Send;
227
228 /// Subscribe to notifications when new peer sets are added.
229 ///
230 /// Returns a receiver that will receive tuples of:
231 /// - The peer set ID
232 /// - The peers in the new set
233 /// - All currently tracked peers (union of recent peer sets)
234 #[allow(clippy::type_complexity)]
235 fn subscribe(
236 &mut self,
237 ) -> impl Future<
238 Output = mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)>,
239 > + Send;
240 }
241
242 /// Interface for managing peer set membership (where peer addresses are not known).
243 pub trait Manager: Provider {
244 /// Track a peer set with the given ID and peers.
245 ///
246 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
247 /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
248 ///
249 /// For good connectivity, all peers must track the same peer sets at the same ID.
250 fn track(
251 &mut self,
252 id: u64,
253 peers: Set<Self::PublicKey>,
254 ) -> impl Future<Output = ()> + Send;
255 }
256
257 /// Interface for managing peer set membership (where peer addresses are known).
258 pub trait AddressableManager: Provider {
259 /// Track a peer set with the given ID and peer<PublicKey, Address> pairs.
260 ///
261 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
262 /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
263 ///
264 /// For good connectivity, all peers must track the same peer sets at the same ID.
265 fn track(
266 &mut self,
267 id: u64,
268 peers: Map<Self::PublicKey, Address>,
269 ) -> impl Future<Output = ()> + Send;
270
271 /// Update addresses for multiple peers without creating a new peer set.
272 ///
273 /// For each peer that is tracked and has a changed address:
274 /// - Any existing connection to the peer is severed (it was on the old IP)
275 /// - The listener's allowed IPs are updated to reflect the new egress IP
276 /// - Future connections will use the new address
277 fn overwrite(
278 &mut self,
279 peers: Map<Self::PublicKey, Address>,
280 ) -> impl Future<Output = ()> + Send;
281 }
282
283 /// Interface for blocking other peers.
284 pub trait Blocker: Clone + Send + 'static {
285 /// Public key type used to identify peers.
286 type PublicKey: PublicKey;
287
288 /// Block a peer, disconnecting them if currently connected and preventing future connections.
289 fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
290 }
291});