commonware_p2p/lib.rs
1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9 html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17 use commonware_cryptography::PublicKey;
18 use commonware_runtime::{IoBuf, IoBufs};
19 use commonware_utils::{
20 channel::mpsc,
21 ordered::{Map, Set},
22 };
23 use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
24
25 pub mod authenticated;
26 pub mod types;
27 pub mod utils;
28
29 pub use types::{Address, Ingress};
30
31 /// Tuple representing a message received from a given public key.
32 ///
33 /// This message is guaranteed to adhere to the configuration of the channel and
34 /// will already be decrypted and authenticated.
35 pub type Message<P> = (P, IoBuf);
36
37 /// Alias for identifying communication channels.
38 pub type Channel = u64;
39
40 /// Enum indicating the set of recipients to send a message to.
41 #[derive(Clone, Debug)]
42 pub enum Recipients<P: PublicKey> {
43 All,
44 Some(Vec<P>),
45 One(P),
46 }
47
48 /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
49 pub trait UnlimitedSender: Clone + Send + Sync + 'static {
50 /// Public key type used to identify recipients.
51 type PublicKey: PublicKey;
52
53 /// Error that can occur when sending a message.
54 type Error: Debug + StdError + Send + Sync + 'static;
55
56 /// Sends a message to a set of recipients.
57 ///
58 /// # Offline Recipients
59 ///
60 /// If a recipient is offline at the time a message is sent, the message
61 /// will be dropped. It is up to the application to handle retries (if
62 /// necessary).
63 ///
64 /// # Returns
65 ///
66 /// A vector of recipients that the message was sent to, or an error if the
67 /// message could not be sent due to a validation failure (e.g., too large).
68 ///
69 /// Note: a successful send does not guarantee that the recipient will
70 /// receive the message.
71 ///
72 /// # Graceful Shutdown
73 ///
74 /// Implementations must handle internal channel closures gracefully during
75 /// shutdown. If the underlying network is shutting down, this method should
76 /// return `Ok` (possibly with an empty or partial recipient list) rather
77 /// than an error. Errors should only be returned for validation failures
78 /// that the caller can act upon.
79 fn send(
80 &mut self,
81 recipients: Recipients<Self::PublicKey>,
82 message: impl Into<IoBufs> + Send,
83 priority: bool,
84 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
85 }
86
87 /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
88 /// filtering out any that are currently rate-limited.
89 pub trait LimitedSender: Clone + Send + Sync + 'static {
90 /// Public key type used to identify recipients.
91 type PublicKey: PublicKey;
92
93 /// The type of [`CheckedSender`] returned after checking recipients.
94 type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
95 where
96 Self: 'a;
97
98 /// Checks which recipients are within their rate limit and returns a
99 /// [`CheckedSender`] for sending to them.
100 ///
101 /// # Rate Limiting
102 ///
103 /// Recipients that exceed their rate limit will be filtered out. The
104 /// returned [`CheckedSender`] will only send to non-limited recipients.
105 ///
106 /// # Returns
107 ///
108 /// A [`CheckedSender`] containing only the recipients that are not
109 /// currently rate-limited, or an error with the earliest instant at which
110 /// all recipients will be available if all are rate-limited.
111 fn check<'a>(
112 &'a mut self,
113 recipients: Recipients<Self::PublicKey>,
114 ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
115 }
116
117 /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
118 pub trait CheckedSender: Send {
119 /// Public key type used to identify [`Recipients`].
120 type PublicKey: PublicKey;
121
122 /// Error that can occur when sending a message.
123 type Error: Debug + StdError + Send + Sync + 'static;
124
125 /// Sends a message to the pre-checked recipients.
126 ///
127 /// # Offline Recipients
128 ///
129 /// If a recipient is offline at the time a message is sent, the message
130 /// will be dropped. It is up to the application to handle retries (if
131 /// necessary).
132 ///
133 /// # Returns
134 ///
135 /// A vector of recipients that the message was sent to, or an error if the
136 /// message could not be sent due to a validation failure (e.g., too large).
137 ///
138 /// Note: a successful send does not guarantee that the recipient will
139 /// receive the message.
140 ///
141 /// # Graceful Shutdown
142 ///
143 /// Implementations must handle internal channel closures gracefully during
144 /// shutdown. If the underlying network is shutting down, this method should
145 /// return `Ok` (possibly with an empty or partial recipient list) rather
146 /// than an error. Errors should only be returned for validation failures
147 /// that the caller can act upon.
148 fn send(
149 self,
150 message: impl Into<IoBufs> + Send,
151 priority: bool,
152 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
153 }
154
155 /// Interface for sending messages to a set of recipients.
156 pub trait Sender: LimitedSender {
157 /// Sends a message to a set of recipients.
158 ///
159 /// # Offline Recipients
160 ///
161 /// If a recipient is offline at the time a message is sent, the message
162 /// will be dropped. It is up to the application to handle retries (if
163 /// necessary).
164 ///
165 /// # Rate Limiting
166 ///
167 /// Recipients that exceed their rate limit will be skipped. The message is
168 /// still sent to non-limited recipients. Check the returned vector to see
169 /// which peers were sent the message.
170 ///
171 /// # Returns
172 ///
173 /// A vector of recipients that the message was sent to, or an error if the
174 /// message could not be sent due to a validation failure (e.g., too large).
175 ///
176 /// Note: a successful send does not guarantee that the recipient will
177 /// receive the message.
178 ///
179 /// # Graceful Shutdown
180 ///
181 /// Implementations must handle internal channel closures gracefully during
182 /// shutdown. If the underlying network is shutting down, this method should
183 /// return `Ok` (possibly with an empty or partial recipient list) rather
184 /// than an error. Errors should only be returned for validation failures
185 /// that the caller can act upon.
186 fn send(
187 &mut self,
188 recipients: Recipients<Self::PublicKey>,
189 message: impl Into<IoBufs> + Send,
190 priority: bool,
191 ) -> impl Future<
192 Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
193 > + Send {
194 async move {
195 match self.check(recipients).await {
196 Ok(checked_sender) => checked_sender.send(message, priority).await,
197 Err(_) => Ok(Vec::new()),
198 }
199 }
200 }
201 }
202
203 // Blanket implementation of `Sender` for all `LimitedSender`s.
204 impl<S: LimitedSender> Sender for S {}
205
206 /// Interface for receiving messages from arbitrary recipients.
207 pub trait Receiver: Debug + Send + 'static {
208 /// Error that can occur when receiving a message.
209 type Error: Debug + StdError + Send + Sync;
210
211 /// Public key type used to identify recipients.
212 type PublicKey: PublicKey;
213
214 /// Receive a message from an arbitrary recipient.
215 fn recv(
216 &mut self,
217 ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
218 }
219
220 /// Alias for the subscription type returned by [`Provider::subscribe`].
221 pub type PeerSetSubscription<P> = mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)>;
222
223 /// Interface for reading peer set information.
224 pub trait Provider: Debug + Clone + Send + 'static {
225 /// Public key type used to identify peers.
226 type PublicKey: PublicKey;
227
228 /// Fetch the ordered set of peers for a given ID.
229 fn peer_set(
230 &mut self,
231 id: u64,
232 ) -> impl Future<Output = Option<Set<Self::PublicKey>>> + Send;
233
234 /// Subscribe to notifications when new peer sets are added.
235 ///
236 /// Returns a receiver that will receive tuples of:
237 /// - The peer set ID
238 /// - The peers in the new set
239 /// - All currently tracked peers (union of recent peer sets)
240 #[allow(clippy::type_complexity)]
241 fn subscribe(
242 &mut self,
243 ) -> impl Future<Output = PeerSetSubscription<Self::PublicKey>> + Send;
244 }
245
246 /// Interface for managing peer set membership (where peer addresses are not known).
247 pub trait Manager: Provider {
248 /// Track a peer set with the given ID and peers.
249 ///
250 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
251 /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
252 ///
253 /// For good connectivity, all peers must track the same peer sets at the same ID.
254 fn track(
255 &mut self,
256 id: u64,
257 peers: Set<Self::PublicKey>,
258 ) -> impl Future<Output = ()> + Send;
259 }
260
261 /// Interface for managing peer set membership (where peer addresses are known).
262 pub trait AddressableManager: Provider {
263 /// Track a peer set with the given ID and peer<PublicKey, Address> pairs.
264 ///
265 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
266 /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
267 ///
268 /// For good connectivity, all peers must track the same peer sets at the same ID.
269 fn track(
270 &mut self,
271 id: u64,
272 peers: Map<Self::PublicKey, Address>,
273 ) -> impl Future<Output = ()> + Send;
274
275 /// Update addresses for multiple peers without creating a new peer set.
276 ///
277 /// For each peer that is tracked and has a changed address:
278 /// - Any existing connection to the peer is severed (it was on the old IP)
279 /// - The listener's allowed IPs are updated to reflect the new egress IP
280 /// - Future connections will use the new address
281 fn overwrite(
282 &mut self,
283 peers: Map<Self::PublicKey, Address>,
284 ) -> impl Future<Output = ()> + Send;
285 }
286
287 /// Interface for blocking other peers.
288 pub trait Blocker: Clone + Send + 'static {
289 /// Public key type used to identify peers.
290 type PublicKey: PublicKey;
291
292 /// Block a peer, disconnecting them if currently connected and preventing future connections.
293 fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
294 }
295});
296
297/// Logs a warning and blocks a peer in a single call.
298///
299/// This macro combines a [`tracing::warn!`] with a [`Blocker::block`] call
300/// to ensure consistent logging at every block site. The peer is always
301/// included as a `peer` field in the log output.
302///
303/// # Examples
304///
305/// ```ignore
306/// block!(self.blocker, sender, "invalid message");
307/// block!(self.blocker, sender, ?err, "invalid ack signature");
308/// block!(self.blocker, sender, %view, "blocking peer for epoch mismatch");
309/// ```
310#[cfg(not(any(
311 commonware_stability_GAMMA,
312 commonware_stability_DELTA,
313 commonware_stability_EPSILON,
314 commonware_stability_RESERVED
315)))] // BETA
316#[macro_export]
317macro_rules! block {
318 ($blocker:expr, $peer:expr, $($arg:tt)+) => {
319 let peer = $peer;
320 tracing::warn!(peer = ?peer, $($arg)+);
321 #[allow(clippy::disallowed_methods)]
322 $blocker.block(peer).await;
323 };
324}
325
326/// Block a peer without logging.
327#[allow(
328 clippy::disallowed_methods,
329 reason = "test helper that bypasses the block! macro"
330)]
331#[cfg(test)]
332pub async fn block_peer<B: Blocker>(blocker: &mut B, peer: B::PublicKey) {
333 blocker.block(peer).await;
334}