commonware_p2p/lib.rs
1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9 html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17 use commonware_cryptography::PublicKey;
18 use commonware_runtime::{IoBuf, IoBufs};
19 use commonware_utils::{
20 channel::mpsc,
21 ordered::{Map, Set},
22 };
23 use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
24
25 pub mod authenticated;
26 pub mod types;
27 pub mod utils;
28
29 pub use types::{Address, Ingress};
30
31 /// Tuple representing a message received from a given public key.
32 ///
33 /// This message is guaranteed to adhere to the configuration of the channel and
34 /// will already be decrypted and authenticated.
35 pub type Message<P> = (P, IoBuf);
36
37 /// Alias for identifying communication channels.
38 pub type Channel = u64;
39
40 /// Enum indicating the set of recipients to send a message to.
41 #[derive(Clone, Debug)]
42 pub enum Recipients<P: PublicKey> {
43 All,
44 Some(Vec<P>),
45 One(P),
46 }
47
48 /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
49 pub trait UnlimitedSender: Clone + Send + Sync + 'static {
50 /// Public key type used to identify recipients.
51 type PublicKey: PublicKey;
52
53 /// Error that can occur when sending a message.
54 type Error: Debug + StdError + Send + Sync + 'static;
55
56 /// Sends a message to a set of recipients.
57 ///
58 /// # Offline Recipients
59 ///
60 /// If a recipient is offline at the time a message is sent, the message
61 /// will be dropped. It is up to the application to handle retries (if
62 /// necessary).
63 ///
64 /// # Returns
65 ///
66 /// A vector of recipients that the message was sent to, or an error if the
67 /// message could not be sent due to a validation failure (e.g., too large).
68 ///
69 /// Note: a successful send does not guarantee that the recipient will
70 /// receive the message.
71 ///
72 /// # Graceful Shutdown
73 ///
74 /// Implementations must handle internal channel closures gracefully during
75 /// shutdown. If the underlying network is shutting down, this method should
76 /// return `Ok` (possibly with an empty or partial recipient list) rather
77 /// than an error. Errors should only be returned for validation failures
78 /// that the caller can act upon.
79 fn send(
80 &mut self,
81 recipients: Recipients<Self::PublicKey>,
82 message: impl Into<IoBufs> + Send,
83 priority: bool,
84 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
85 }
86
87 /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
88 /// filtering out any that are currently rate-limited.
89 pub trait LimitedSender: Clone + Send + Sync + 'static {
90 /// Public key type used to identify recipients.
91 type PublicKey: PublicKey;
92
93 /// The type of [`CheckedSender`] returned after checking recipients.
94 type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
95 where
96 Self: 'a;
97
98 /// Checks which recipients are within their rate limit and returns a
99 /// [`CheckedSender`] for sending to them.
100 ///
101 /// # Rate Limiting
102 ///
103 /// Recipients that exceed their rate limit will be filtered out. The
104 /// returned [`CheckedSender`] will only send to non-limited recipients.
105 ///
106 /// # Returns
107 ///
108 /// A [`CheckedSender`] containing only the recipients that are not
109 /// currently rate-limited, or an error with the earliest instant at which
110 /// all recipients will be available if all are rate-limited.
111 fn check<'a>(
112 &'a mut self,
113 recipients: Recipients<Self::PublicKey>,
114 ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
115 }
116
117 /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
118 pub trait CheckedSender: Send {
119 /// Public key type used to identify [`Recipients`].
120 type PublicKey: PublicKey;
121
122 /// Error that can occur when sending a message.
123 type Error: Debug + StdError + Send + Sync + 'static;
124
125 /// Sends a message to the pre-checked recipients.
126 ///
127 /// # Offline Recipients
128 ///
129 /// If a recipient is offline at the time a message is sent, the message
130 /// will be dropped. It is up to the application to handle retries (if
131 /// necessary).
132 ///
133 /// # Returns
134 ///
135 /// A vector of recipients that the message was sent to, or an error if the
136 /// message could not be sent due to a validation failure (e.g., too large).
137 ///
138 /// Note: a successful send does not guarantee that the recipient will
139 /// receive the message.
140 ///
141 /// # Graceful Shutdown
142 ///
143 /// Implementations must handle internal channel closures gracefully during
144 /// shutdown. If the underlying network is shutting down, this method should
145 /// return `Ok` (possibly with an empty or partial recipient list) rather
146 /// than an error. Errors should only be returned for validation failures
147 /// that the caller can act upon.
148 fn send(
149 self,
150 message: impl Into<IoBufs> + Send,
151 priority: bool,
152 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
153 }
154
155 /// Interface for sending messages to a set of recipients.
156 pub trait Sender: LimitedSender {
157 /// Sends a message to a set of recipients.
158 ///
159 /// # Offline Recipients
160 ///
161 /// If a recipient is offline at the time a message is sent, the message
162 /// will be dropped. It is up to the application to handle retries (if
163 /// necessary).
164 ///
165 /// # Rate Limiting
166 ///
167 /// Recipients that exceed their rate limit will be skipped. The message is
168 /// still sent to non-limited recipients. Check the returned vector to see
169 /// which peers were sent the message.
170 ///
171 /// # Returns
172 ///
173 /// A vector of recipients that the message was sent to, or an error if the
174 /// message could not be sent due to a validation failure (e.g., too large).
175 ///
176 /// Note: a successful send does not guarantee that the recipient will
177 /// receive the message.
178 ///
179 /// # Graceful Shutdown
180 ///
181 /// Implementations must handle internal channel closures gracefully during
182 /// shutdown. If the underlying network is shutting down, this method should
183 /// return `Ok` (possibly with an empty or partial recipient list) rather
184 /// than an error. Errors should only be returned for validation failures
185 /// that the caller can act upon.
186 fn send(
187 &mut self,
188 recipients: Recipients<Self::PublicKey>,
189 message: impl Into<IoBufs> + Send,
190 priority: bool,
191 ) -> impl Future<
192 Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
193 > + Send {
194 async move {
195 match self.check(recipients).await {
196 Ok(checked_sender) => checked_sender.send(message, priority).await,
197 Err(_) => Ok(Vec::new()),
198 }
199 }
200 }
201 }
202
203 // Blanket implementation of `Sender` for all `LimitedSender`s.
204 impl<S: LimitedSender> Sender for S {}
205
206 /// Interface for receiving messages from arbitrary recipients.
207 pub trait Receiver: Debug + Send + 'static {
208 /// Error that can occur when receiving a message.
209 type Error: Debug + StdError + Send + Sync;
210
211 /// Public key type used to identify recipients.
212 type PublicKey: PublicKey;
213
214 /// Receive a message from an arbitrary recipient.
215 fn recv(
216 &mut self,
217 ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
218 }
219
220 /// Notification sent to subscribers when a peer set changes.
221 #[derive(Clone, Debug)]
222 pub struct PeerSetUpdate<P: PublicKey> {
223 /// The index of the peer set that changed.
224 pub index: u64,
225 /// The primary and secondary peers in the new set.
226 pub latest: TrackedPeers<P>,
227 /// Union of primary and secondary peers across all tracked peer sets.
228 pub all: TrackedPeers<P>,
229 }
230
231 /// Alias for the subscription type returned by [`Provider::subscribe`].
232 pub type PeerSetSubscription<P> = mpsc::UnboundedReceiver<PeerSetUpdate<P>>;
233
234 /// Primary and secondary peers provided together to [`Manager::track`].
235 ///
236 /// The same public key may appear in both `primary` and `secondary`. [`Manager::track`]
237 /// deduplicates overlapping keys, storing them as primary only.
238 #[derive(Clone, Debug, PartialEq, Eq)]
239 pub struct TrackedPeers<P: PublicKey> {
240 /// Peers eligible for primary-only policies.
241 pub primary: Set<P>,
242 /// Peers eligible for secondary-only policies.
243 pub secondary: Set<P>,
244 }
245
246 impl<P: PublicKey> TrackedPeers<P> {
247 pub const fn new(primary: Set<P>, secondary: Set<P>) -> Self {
248 Self { primary, secondary }
249 }
250
251 pub fn primary(primary: Set<P>) -> Self {
252 Self::new(primary, Set::default())
253 }
254
255 /// Returns the deduplicated union of primary and secondary peers.
256 pub fn union(self) -> Set<P> {
257 Set::from_iter_dedup(self.primary.into_iter().chain(self.secondary))
258 }
259 }
260
261 impl<P: PublicKey> From<Set<P>> for TrackedPeers<P> {
262 fn from(primary: Set<P>) -> Self {
263 Self::primary(primary)
264 }
265 }
266
267 impl<P: PublicKey> Default for TrackedPeers<P> {
268 fn default() -> Self {
269 Self::new(Set::default(), Set::default())
270 }
271 }
272
273 /// Primary and secondary peers provided together to [`AddressableManager::track`].
274 ///
275 /// The same public key may appear in both maps. [`AddressableManager::track`]
276 /// deduplicates overlapping keys, storing them as primary only.
277 #[derive(Clone, Debug)]
278 pub struct AddressableTrackedPeers<P: PublicKey> {
279 /// Addresses for peers eligible for primary-only policies.
280 pub primary: Map<P, Address>,
281 /// Addresses for peers eligible for secondary-only policies.
282 pub secondary: Map<P, Address>,
283 }
284
285 impl<P: PublicKey> AddressableTrackedPeers<P> {
286 pub const fn new(primary: Map<P, Address>, secondary: Map<P, Address>) -> Self {
287 Self { primary, secondary }
288 }
289
290 pub fn primary(primary: Map<P, Address>) -> Self {
291 Self::new(primary, Map::default())
292 }
293 }
294
295 impl<P: PublicKey> From<Map<P, Address>> for AddressableTrackedPeers<P> {
296 fn from(primary: Map<P, Address>) -> Self {
297 Self::primary(primary)
298 }
299 }
300
301 /// Interface for reading peer set information.
302 pub trait Provider: Debug + Clone + Send + 'static {
303 /// Public key type used to identify peers.
304 type PublicKey: PublicKey;
305
306 /// Fetch the primary and secondary peers tracked at the given ID.
307 fn peer_set(
308 &mut self,
309 id: u64,
310 ) -> impl Future<Output = Option<TrackedPeers<Self::PublicKey>>> + Send;
311
312 /// Subscribe to notifications when new peer sets are added.
313 ///
314 /// Returns a receiver of [`PeerSetUpdate`] notifications. Each update's
315 /// `latest` reflects how [`Manager::track`] stored the set: a peer listed in
316 /// both roles appears only under `latest.primary`. The `all` field aggregates
317 /// across tracked sets with the same rule (secondary excludes keys present as primary).
318 fn subscribe(
319 &mut self,
320 ) -> impl Future<Output = PeerSetSubscription<Self::PublicKey>> + Send;
321 }
322
323 /// Interface for managing peer set membership (where peer addresses are not known).
324 pub trait Manager: Provider {
325 /// Track a primary and secondary peer set with the given ID.
326 ///
327 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
328 /// of the consensus engine. It must be monotonically increasing as new peer sets are
329 /// tracked.
330 ///
331 /// For good connectivity, all peers must track the same peer sets at the same ID.
332 ///
333 /// Callers may pass either a list of primary peers or a [`TrackedPeers`] value containing both primary and secondary peers.
334 ///
335 /// Overlapping keys in [`TrackedPeers`] are allowed; they are deduplicated as primary only.
336 ///
337 /// ## Active Peers
338 ///
339 /// The most recently registered peer set (highest ID) is considered the
340 /// active set. Implementations use the active set to decide which peers to
341 /// maintain connections with and which to disconnect from.
342 ///
343 /// ## Primary vs Secondary Peers
344 ///
345 /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
346 /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
347 /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
348 /// often both gossiping data to them and answering requests from them).
349 fn track<R>(
350 &mut self,
351 id: u64,
352 peers: R,
353 ) -> impl Future<Output = ()> + Send
354 where
355 R: Into<TrackedPeers<Self::PublicKey>> + Send;
356 }
357
358 /// Interface for managing peer set membership (where peer addresses are known).
359 pub trait AddressableManager: Provider {
360 /// Track a primary peer set and secondary peers with the given ID.
361 ///
362 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
363 /// of the consensus engine. It must be monotonically increasing as new peer sets are
364 /// tracked.
365 ///
366 /// For good connectivity, all peers must track the same peer sets at the same ID.
367 ///
368 /// Callers may pass either a list of primary peers or a [`AddressableTrackedPeers`] value containing
369 /// both primary and secondary peers.
370 ///
371 /// The same key may appear in both maps; see [`AddressableTrackedPeers`].
372 ///
373 /// ## Active Peers
374 ///
375 /// The most recently registered peer set (highest ID) is considered the
376 /// active set. Implementations use the active set to decide which peers to
377 /// maintain connections with and which to disconnect from.
378 ///
379 /// ## Primary vs Secondary Peers
380 ///
381 /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
382 /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
383 /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
384 /// often both gossiping data to them and answering requests from them).
385 fn track<R>(
386 &mut self,
387 id: u64,
388 peers: R,
389 ) -> impl Future<Output = ()> + Send
390 where
391 R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send;
392
393 /// Update addresses for multiple peers without creating a new peer set.
394 ///
395 /// For each primary or secondary peer with a changed address:
396 /// - Any existing connection to the peer is severed (it was on the old IP)
397 /// - The listener's allowed IPs are updated to reflect the new egress IP
398 /// - Future connections will use the new address
399 fn overwrite(
400 &mut self,
401 peers: Map<Self::PublicKey, Address>,
402 ) -> impl Future<Output = ()> + Send;
403 }
404
405 /// Interface for blocking other peers.
406 pub trait Blocker: Clone + Send + 'static {
407 /// Public key type used to identify peers.
408 type PublicKey: PublicKey;
409
410 /// Block a peer, disconnecting them if currently connected and preventing future connections.
411 fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
412 }
413});
414
415/// Logs a warning and blocks a peer in a single call.
416///
417/// This macro combines a [`tracing::warn!`] with a [`Blocker::block`] call
418/// to ensure consistent logging at every block site. The peer is always
419/// included as a `peer` field in the log output.
420///
421/// # Examples
422///
423/// ```ignore
424/// block!(self.blocker, sender, "invalid message");
425/// block!(self.blocker, sender, ?err, "invalid ack signature");
426/// block!(self.blocker, sender, %view, "blocking peer for epoch mismatch");
427/// ```
428#[cfg(not(any(
429 commonware_stability_GAMMA,
430 commonware_stability_DELTA,
431 commonware_stability_EPSILON,
432 commonware_stability_RESERVED
433)))] // BETA
434#[macro_export]
435macro_rules! block {
436 ($blocker:expr, $peer:expr, $($arg:tt)+) => {
437 let peer = $peer;
438 tracing::warn!(peer = ?peer, $($arg)+);
439 #[allow(clippy::disallowed_methods)]
440 $blocker.block(peer).await;
441 };
442}
443
444/// Block a peer without logging.
445#[allow(
446 clippy::disallowed_methods,
447 reason = "test helper that bypasses the block! macro"
448)]
449#[cfg(test)]
450pub async fn block_peer<B: Blocker>(blocker: &mut B, peer: B::PublicKey) {
451 blocker.block(peer).await;
452}