Skip to main content

commonware_p2p/
lib.rs

1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9    html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17    use commonware_cryptography::PublicKey;
18    use commonware_runtime::{IoBuf, IoBufs};
19    use commonware_utils::{
20        channel::mpsc,
21        ordered::{Map, Set},
22    };
23    use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
24
25    pub mod authenticated;
26    pub mod types;
27    pub mod utils;
28
29    pub use types::{Address, Ingress};
30
31    /// Tuple representing a message received from a given public key.
32    ///
33    /// This message is guaranteed to adhere to the configuration of the channel and
34    /// will already be decrypted and authenticated.
35    pub type Message<P> = (P, IoBuf);
36
37    /// Alias for identifying communication channels.
38    pub type Channel = u64;
39
40    /// Enum indicating the set of recipients to send a message to.
41    #[derive(Clone, Debug)]
42    pub enum Recipients<P: PublicKey> {
43        All,
44        Some(Vec<P>),
45        One(P),
46    }
47
48    /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
49    pub trait UnlimitedSender: Clone + Send + Sync + 'static {
50        /// Public key type used to identify recipients.
51        type PublicKey: PublicKey;
52
53        /// Error that can occur when sending a message.
54        type Error: Debug + StdError + Send + Sync + 'static;
55
56        /// Sends a message to a set of recipients.
57        ///
58        /// # Offline Recipients
59        ///
60        /// If a recipient is offline at the time a message is sent, the message
61        /// will be dropped. It is up to the application to handle retries (if
62        /// necessary).
63        ///
64        /// # Returns
65        ///
66        /// A vector of recipients that the message was sent to, or an error if the
67        /// message could not be sent due to a validation failure (e.g., too large).
68        ///
69        /// Note: a successful send does not guarantee that the recipient will
70        /// receive the message.
71        ///
72        /// # Graceful Shutdown
73        ///
74        /// Implementations must handle internal channel closures gracefully during
75        /// shutdown. If the underlying network is shutting down, this method should
76        /// return `Ok` (possibly with an empty or partial recipient list) rather
77        /// than an error. Errors should only be returned for validation failures
78        /// that the caller can act upon.
79        fn send(
80            &mut self,
81            recipients: Recipients<Self::PublicKey>,
82            message: impl Into<IoBufs> + Send,
83            priority: bool,
84        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
85    }
86
87    /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
88    /// filtering out any that are currently rate-limited.
89    pub trait LimitedSender: Clone + Send + Sync + 'static {
90        /// Public key type used to identify recipients.
91        type PublicKey: PublicKey;
92
93        /// The type of [`CheckedSender`] returned after checking recipients.
94        type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
95        where
96            Self: 'a;
97
98        /// Checks which recipients are within their rate limit and returns a
99        /// [`CheckedSender`] for sending to them.
100        ///
101        /// # Rate Limiting
102        ///
103        /// Recipients that exceed their rate limit will be filtered out. The
104        /// returned [`CheckedSender`] will only send to non-limited recipients.
105        ///
106        /// # Returns
107        ///
108        /// A [`CheckedSender`] containing only the recipients that are not
109        /// currently rate-limited, or an error with the earliest instant at which
110        /// all recipients will be available if all are rate-limited.
111        fn check<'a>(
112            &'a mut self,
113            recipients: Recipients<Self::PublicKey>,
114        ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
115    }
116
117    /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
118    pub trait CheckedSender: Send {
119        /// Public key type used to identify [`Recipients`].
120        type PublicKey: PublicKey;
121
122        /// Error that can occur when sending a message.
123        type Error: Debug + StdError + Send + Sync + 'static;
124
125        /// Sends a message to the pre-checked recipients.
126        ///
127        /// # Offline Recipients
128        ///
129        /// If a recipient is offline at the time a message is sent, the message
130        /// will be dropped. It is up to the application to handle retries (if
131        /// necessary).
132        ///
133        /// # Returns
134        ///
135        /// A vector of recipients that the message was sent to, or an error if the
136        /// message could not be sent due to a validation failure (e.g., too large).
137        ///
138        /// Note: a successful send does not guarantee that the recipient will
139        /// receive the message.
140        ///
141        /// # Graceful Shutdown
142        ///
143        /// Implementations must handle internal channel closures gracefully during
144        /// shutdown. If the underlying network is shutting down, this method should
145        /// return `Ok` (possibly with an empty or partial recipient list) rather
146        /// than an error. Errors should only be returned for validation failures
147        /// that the caller can act upon.
148        fn send(
149            self,
150            message: impl Into<IoBufs> + Send,
151            priority: bool,
152        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
153    }
154
155    /// Interface for sending messages to a set of recipients.
156    pub trait Sender: LimitedSender {
157        /// Sends a message to a set of recipients.
158        ///
159        /// # Offline Recipients
160        ///
161        /// If a recipient is offline at the time a message is sent, the message
162        /// will be dropped. It is up to the application to handle retries (if
163        /// necessary).
164        ///
165        /// # Rate Limiting
166        ///
167        /// Recipients that exceed their rate limit will be skipped. The message is
168        /// still sent to non-limited recipients. Check the returned vector to see
169        /// which peers were sent the message.
170        ///
171        /// # Returns
172        ///
173        /// A vector of recipients that the message was sent to, or an error if the
174        /// message could not be sent due to a validation failure (e.g., too large).
175        ///
176        /// Note: a successful send does not guarantee that the recipient will
177        /// receive the message.
178        ///
179        /// # Graceful Shutdown
180        ///
181        /// Implementations must handle internal channel closures gracefully during
182        /// shutdown. If the underlying network is shutting down, this method should
183        /// return `Ok` (possibly with an empty or partial recipient list) rather
184        /// than an error. Errors should only be returned for validation failures
185        /// that the caller can act upon.
186        fn send(
187            &mut self,
188            recipients: Recipients<Self::PublicKey>,
189            message: impl Into<IoBufs> + Send,
190            priority: bool,
191        ) -> impl Future<
192            Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
193        > + Send {
194            async move {
195                match self.check(recipients).await {
196                    Ok(checked_sender) => checked_sender.send(message, priority).await,
197                    Err(_) => Ok(Vec::new()),
198                }
199            }
200        }
201    }
202
203    // Blanket implementation of `Sender` for all `LimitedSender`s.
204    impl<S: LimitedSender> Sender for S {}
205
206    /// Interface for receiving messages from arbitrary recipients.
207    pub trait Receiver: Debug + Send + 'static {
208        /// Error that can occur when receiving a message.
209        type Error: Debug + StdError + Send + Sync;
210
211        /// Public key type used to identify recipients.
212        type PublicKey: PublicKey;
213
214        /// Receive a message from an arbitrary recipient.
215        fn recv(
216            &mut self,
217        ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
218    }
219
220    /// Notification sent to subscribers when a peer set changes.
221    #[derive(Clone, Debug)]
222    pub struct PeerSetUpdate<P: PublicKey> {
223        /// The index of the peer set that changed.
224        pub index: u64,
225        /// The primary and secondary peers in the new set.
226        pub latest: TrackedPeers<P>,
227        /// Union of primary and secondary peers across all tracked peer sets.
228        pub all: TrackedPeers<P>,
229    }
230
231    /// Alias for the subscription type returned by [`Provider::subscribe`].
232    pub type PeerSetSubscription<P> = mpsc::UnboundedReceiver<PeerSetUpdate<P>>;
233
234    /// Primary and secondary peers provided together to [`Manager::track`].
235    ///
236    /// The same public key may appear in both `primary` and `secondary`. [`Manager::track`]
237    /// deduplicates overlapping keys, storing them as primary only.
238    #[derive(Clone, Debug, PartialEq, Eq)]
239    pub struct TrackedPeers<P: PublicKey> {
240        /// Peers eligible for primary-only policies.
241        pub primary: Set<P>,
242        /// Peers eligible for secondary-only policies.
243        pub secondary: Set<P>,
244    }
245
246    impl<P: PublicKey> TrackedPeers<P> {
247        pub const fn new(primary: Set<P>, secondary: Set<P>) -> Self {
248            Self { primary, secondary }
249        }
250
251        pub fn primary(primary: Set<P>) -> Self {
252            Self::new(primary, Set::default())
253        }
254
255        /// Returns the deduplicated union of primary and secondary peers.
256        pub fn union(self) -> Set<P> {
257            Set::from_iter_dedup(self.primary.into_iter().chain(self.secondary))
258        }
259    }
260
261    impl<P: PublicKey> From<Set<P>> for TrackedPeers<P> {
262        fn from(primary: Set<P>) -> Self {
263            Self::primary(primary)
264        }
265    }
266
267    impl<P: PublicKey> Default for TrackedPeers<P> {
268        fn default() -> Self {
269            Self::new(Set::default(), Set::default())
270        }
271    }
272
273    /// Primary and secondary peers provided together to [`AddressableManager::track`].
274    ///
275    /// The same public key may appear in both maps. [`AddressableManager::track`]
276    /// deduplicates overlapping keys, storing them as primary only.
277    #[derive(Clone, Debug)]
278    pub struct AddressableTrackedPeers<P: PublicKey> {
279        /// Addresses for peers eligible for primary-only policies.
280        pub primary: Map<P, Address>,
281        /// Addresses for peers eligible for secondary-only policies.
282        pub secondary: Map<P, Address>,
283    }
284
285    impl<P: PublicKey> AddressableTrackedPeers<P> {
286        pub const fn new(primary: Map<P, Address>, secondary: Map<P, Address>) -> Self {
287            Self { primary, secondary }
288        }
289
290        pub fn primary(primary: Map<P, Address>) -> Self {
291            Self::new(primary, Map::default())
292        }
293    }
294
295    impl<P: PublicKey> From<Map<P, Address>> for AddressableTrackedPeers<P> {
296        fn from(primary: Map<P, Address>) -> Self {
297            Self::primary(primary)
298        }
299    }
300
301    /// Interface for reading peer set information.
302    pub trait Provider: Debug + Clone + Send + 'static {
303        /// Public key type used to identify peers.
304        type PublicKey: PublicKey;
305
306        /// Fetch the primary and secondary peers tracked at the given ID.
307        fn peer_set(
308            &mut self,
309            id: u64,
310        ) -> impl Future<Output = Option<TrackedPeers<Self::PublicKey>>> + Send;
311
312        /// Subscribe to notifications when new peer sets are added.
313        ///
314        /// Returns a receiver of [`PeerSetUpdate`] notifications. Each update's
315        /// `latest` reflects how [`Manager::track`] stored the set: a peer listed in
316        /// both roles appears only under `latest.primary`. The `all` field aggregates
317        /// across tracked sets with the same rule (secondary excludes keys present as primary).
318        fn subscribe(
319            &mut self,
320        ) -> impl Future<Output = PeerSetSubscription<Self::PublicKey>> + Send;
321    }
322
323    /// Interface for managing peer set membership (where peer addresses are not known).
324    pub trait Manager: Provider {
325        /// Track a primary and secondary peer set with the given ID.
326        ///
327        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
328        /// of the consensus engine. It must be monotonically increasing as new peer sets are
329        /// tracked.
330        ///
331        /// For good connectivity, all peers must track the same peer sets at the same ID.
332        ///
333        /// Callers may pass either a list of primary peers or a [`TrackedPeers`] value containing both primary and secondary peers.
334        ///
335        /// Overlapping keys in [`TrackedPeers`] are allowed; they are deduplicated as primary only.
336        ///
337        /// ## Active Peers
338        ///
339        /// The most recently registered peer set (highest ID) is considered the
340        /// active set. Implementations use the active set to decide which peers to
341        /// maintain connections with and which to disconnect from.
342        ///
343        /// ## Primary vs Secondary Peers
344        ///
345        /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
346        /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
347        /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
348        /// often both gossiping data to them and answering requests from them).
349        fn track<R>(
350            &mut self,
351            id: u64,
352            peers: R,
353        ) -> impl Future<Output = ()> + Send
354        where
355            R: Into<TrackedPeers<Self::PublicKey>> + Send;
356    }
357
358    /// Interface for managing peer set membership (where peer addresses are known).
359    pub trait AddressableManager: Provider {
360        /// Track a primary peer set and secondary peers with the given ID.
361        ///
362        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
363        /// of the consensus engine. It must be monotonically increasing as new peer sets are
364        /// tracked.
365        ///
366        /// For good connectivity, all peers must track the same peer sets at the same ID.
367        ///
368        /// Callers may pass either a list of primary peers or a [`AddressableTrackedPeers`] value containing
369        /// both primary and secondary peers.
370        ///
371        /// The same key may appear in both maps; see [`AddressableTrackedPeers`].
372        ///
373        /// ## Active Peers
374        ///
375        /// The most recently registered peer set (highest ID) is considered the
376        /// active set. Implementations use the active set to decide which peers to
377        /// maintain connections with and which to disconnect from.
378        ///
379        /// ## Primary vs Secondary Peers
380        ///
381        /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
382        /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
383        /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
384        /// often both gossiping data to them and answering requests from them).
385        fn track<R>(
386            &mut self,
387            id: u64,
388            peers: R,
389        ) -> impl Future<Output = ()> + Send
390        where
391            R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send;
392
393        /// Update addresses for multiple peers without creating a new peer set.
394        ///
395        /// For each primary or secondary peer with a changed address:
396        /// - Any existing connection to the peer is severed (it was on the old IP)
397        /// - The listener's allowed IPs are updated to reflect the new egress IP
398        /// - Future connections will use the new address
399        fn overwrite(
400            &mut self,
401            peers: Map<Self::PublicKey, Address>,
402        ) -> impl Future<Output = ()> + Send;
403    }
404
405    /// Interface for blocking other peers.
406    pub trait Blocker: Clone + Send + 'static {
407        /// Public key type used to identify peers.
408        type PublicKey: PublicKey;
409
410        /// Block a peer, disconnecting them if currently connected and preventing future connections.
411        fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
412    }
413});
414
415/// Logs a warning and blocks a peer in a single call.
416///
417/// This macro combines a [`tracing::warn!`] with a [`Blocker::block`] call
418/// to ensure consistent logging at every block site. The peer is always
419/// included as a `peer` field in the log output.
420///
421/// # Examples
422///
423/// ```ignore
424/// block!(self.blocker, sender, "invalid message");
425/// block!(self.blocker, sender, ?err, "invalid ack signature");
426/// block!(self.blocker, sender, %view, "blocking peer for epoch mismatch");
427/// ```
428#[cfg(not(any(
429    commonware_stability_GAMMA,
430    commonware_stability_DELTA,
431    commonware_stability_EPSILON,
432    commonware_stability_RESERVED
433)))] // BETA
434#[macro_export]
435macro_rules! block {
436    ($blocker:expr, $peer:expr, $($arg:tt)+) => {
437        let peer = $peer;
438        tracing::warn!(peer = ?peer, $($arg)+);
439        #[allow(clippy::disallowed_methods)]
440        $blocker.block(peer).await;
441    };
442}
443
444/// Block a peer without logging.
445#[allow(
446    clippy::disallowed_methods,
447    reason = "test helper that bypasses the block! macro"
448)]
449#[cfg(test)]
450pub async fn block_peer<B: Blocker>(blocker: &mut B, peer: B::PublicKey) {
451    blocker.block(peer).await;
452}