Skip to main content

commonware_p2p/
lib.rs

1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9    html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17    use commonware_cryptography::PublicKey;
18    use commonware_runtime::{IoBuf, IoBufs};
19    use commonware_utils::{
20        channel::mpsc,
21        ordered::{Map, Set},
22    };
23    use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
24
25    pub mod authenticated;
26    pub mod types;
27    pub mod utils;
28
29    pub use types::{Address, Ingress};
30
31    /// Tuple representing a message received from a given public key.
32    ///
33    /// This message is guaranteed to adhere to the configuration of the channel and
34    /// will already be decrypted and authenticated.
35    pub type Message<P> = (P, IoBuf);
36
37    /// Alias for identifying communication channels.
38    pub type Channel = u64;
39
40    /// Enum indicating the set of recipients to send a message to.
41    #[derive(Clone, Debug)]
42    pub enum Recipients<P: PublicKey> {
43        All,
44        Some(Vec<P>),
45        One(P),
46    }
47
48    /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
49    pub trait UnlimitedSender: Clone + Send + Sync + 'static {
50        /// Public key type used to identify recipients.
51        type PublicKey: PublicKey;
52
53        /// Error that can occur when sending a message.
54        type Error: Debug + StdError + Send + Sync + 'static;
55
56        /// Sends a message to a set of recipients.
57        ///
58        /// # Offline Recipients
59        ///
60        /// If a recipient is offline at the time a message is sent, the message
61        /// will be dropped. It is up to the application to handle retries (if
62        /// necessary).
63        ///
64        /// # Returns
65        ///
66        /// A vector of recipients that the message was sent to, or an error if the
67        /// message could not be sent due to a validation failure (e.g., too large).
68        ///
69        /// Note: a successful send does not guarantee that the recipient will
70        /// receive the message.
71        ///
72        /// # Graceful Shutdown
73        ///
74        /// Implementations must handle internal channel closures gracefully during
75        /// shutdown. If the underlying network is shutting down, this method should
76        /// return `Ok` (possibly with an empty or partial recipient list) rather
77        /// than an error. Errors should only be returned for validation failures
78        /// that the caller can act upon.
79        fn send(
80            &mut self,
81            recipients: Recipients<Self::PublicKey>,
82            message: impl Into<IoBufs> + Send,
83            priority: bool,
84        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
85    }
86
87    /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
88    /// filtering out any that are currently rate-limited.
89    pub trait LimitedSender: Clone + Send + Sync + 'static {
90        /// Public key type used to identify recipients.
91        type PublicKey: PublicKey;
92
93        /// The type of [`CheckedSender`] returned after checking recipients.
94        type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
95        where
96            Self: 'a;
97
98        /// Checks which recipients are within their rate limit and returns a
99        /// [`CheckedSender`] for sending to them.
100        ///
101        /// # Rate Limiting
102        ///
103        /// Recipients that exceed their rate limit will be filtered out. The
104        /// returned [`CheckedSender`] will only send to non-limited recipients.
105        ///
106        /// # Returns
107        ///
108        /// A [`CheckedSender`] containing only the recipients that are not
109        /// currently rate-limited, or an error with the earliest instant at which
110        /// all recipients will be available if all are rate-limited.
111        fn check<'a>(
112            &'a mut self,
113            recipients: Recipients<Self::PublicKey>,
114        ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
115    }
116
117    /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
118    pub trait CheckedSender: Send {
119        /// Public key type used to identify [`Recipients`].
120        type PublicKey: PublicKey;
121
122        /// Error that can occur when sending a message.
123        type Error: Debug + StdError + Send + Sync + 'static;
124
125        /// Sends a message to the pre-checked recipients.
126        ///
127        /// # Offline Recipients
128        ///
129        /// If a recipient is offline at the time a message is sent, the message
130        /// will be dropped. It is up to the application to handle retries (if
131        /// necessary).
132        ///
133        /// # Returns
134        ///
135        /// A vector of recipients that the message was sent to, or an error if the
136        /// message could not be sent due to a validation failure (e.g., too large).
137        ///
138        /// Note: a successful send does not guarantee that the recipient will
139        /// receive the message.
140        ///
141        /// # Graceful Shutdown
142        ///
143        /// Implementations must handle internal channel closures gracefully during
144        /// shutdown. If the underlying network is shutting down, this method should
145        /// return `Ok` (possibly with an empty or partial recipient list) rather
146        /// than an error. Errors should only be returned for validation failures
147        /// that the caller can act upon.
148        fn send(
149            self,
150            message: impl Into<IoBufs> + Send,
151            priority: bool,
152        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
153    }
154
155    /// Interface for sending messages to a set of recipients.
156    pub trait Sender: LimitedSender {
157        /// Sends a message to a set of recipients.
158        ///
159        /// # Offline Recipients
160        ///
161        /// If a recipient is offline at the time a message is sent, the message
162        /// will be dropped. It is up to the application to handle retries (if
163        /// necessary).
164        ///
165        /// # Rate Limiting
166        ///
167        /// Recipients that exceed their rate limit will be skipped. The message is
168        /// still sent to non-limited recipients. Check the returned vector to see
169        /// which peers were sent the message.
170        ///
171        /// # Returns
172        ///
173        /// A vector of recipients that the message was sent to, or an error if the
174        /// message could not be sent due to a validation failure (e.g., too large).
175        ///
176        /// Note: a successful send does not guarantee that the recipient will
177        /// receive the message.
178        ///
179        /// # Graceful Shutdown
180        ///
181        /// Implementations must handle internal channel closures gracefully during
182        /// shutdown. If the underlying network is shutting down, this method should
183        /// return `Ok` (possibly with an empty or partial recipient list) rather
184        /// than an error. Errors should only be returned for validation failures
185        /// that the caller can act upon.
186        fn send(
187            &mut self,
188            recipients: Recipients<Self::PublicKey>,
189            message: impl Into<IoBufs> + Send,
190            priority: bool,
191        ) -> impl Future<
192            Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
193        > + Send {
194            async move {
195                match self.check(recipients).await {
196                    Ok(checked_sender) => checked_sender.send(message, priority).await,
197                    Err(_) => Ok(Vec::new()),
198                }
199            }
200        }
201    }
202
203    // Blanket implementation of `Sender` for all `LimitedSender`s.
204    impl<S: LimitedSender> Sender for S {}
205
206    /// Interface for receiving messages from arbitrary recipients.
207    pub trait Receiver: Debug + Send + 'static {
208        /// Error that can occur when receiving a message.
209        type Error: Debug + StdError + Send + Sync;
210
211        /// Public key type used to identify recipients.
212        type PublicKey: PublicKey;
213
214        /// Receive a message from an arbitrary recipient.
215        fn recv(
216            &mut self,
217        ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
218    }
219
220    /// Alias for the subscription type returned by [`Provider::subscribe`].
221    pub type PeerSetSubscription<P> = mpsc::UnboundedReceiver<(u64, Set<P>, Set<P>)>;
222
223    /// Interface for reading peer set information.
224    pub trait Provider: Debug + Clone + Send + 'static {
225        /// Public key type used to identify peers.
226        type PublicKey: PublicKey;
227
228        /// Fetch the ordered set of peers for a given ID.
229        fn peer_set(
230            &mut self,
231            id: u64,
232        ) -> impl Future<Output = Option<Set<Self::PublicKey>>> + Send;
233
234        /// Subscribe to notifications when new peer sets are added.
235        ///
236        /// Returns a receiver that will receive tuples of:
237        /// - The peer set ID
238        /// - The peers in the new set
239        /// - All currently tracked peers (union of recent peer sets)
240        #[allow(clippy::type_complexity)]
241        fn subscribe(
242            &mut self,
243        ) -> impl Future<Output = PeerSetSubscription<Self::PublicKey>> + Send;
244    }
245
246    /// Interface for managing peer set membership (where peer addresses are not known).
247    pub trait Manager: Provider {
248        /// Track a peer set with the given ID and peers.
249        ///
250        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
251        /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
252        ///
253        /// For good connectivity, all peers must track the same peer sets at the same ID.
254        fn track(
255            &mut self,
256            id: u64,
257            peers: Set<Self::PublicKey>,
258        ) -> impl Future<Output = ()> + Send;
259    }
260
261    /// Interface for managing peer set membership (where peer addresses are known).
262    pub trait AddressableManager: Provider {
263        /// Track a peer set with the given ID and peer<PublicKey, Address> pairs.
264        ///
265        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
266        /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
267        ///
268        /// For good connectivity, all peers must track the same peer sets at the same ID.
269        fn track(
270            &mut self,
271            id: u64,
272            peers: Map<Self::PublicKey, Address>,
273        ) -> impl Future<Output = ()> + Send;
274
275        /// Update addresses for multiple peers without creating a new peer set.
276        ///
277        /// For each peer that is tracked and has a changed address:
278        /// - Any existing connection to the peer is severed (it was on the old IP)
279        /// - The listener's allowed IPs are updated to reflect the new egress IP
280        /// - Future connections will use the new address
281        fn overwrite(
282            &mut self,
283            peers: Map<Self::PublicKey, Address>,
284        ) -> impl Future<Output = ()> + Send;
285    }
286
287    /// Interface for blocking other peers.
288    pub trait Blocker: Clone + Send + 'static {
289        /// Public key type used to identify peers.
290        type PublicKey: PublicKey;
291
292        /// Block a peer, disconnecting them if currently connected and preventing future connections.
293        fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
294    }
295});
296
297/// Logs a warning and blocks a peer in a single call.
298///
299/// This macro combines a [`tracing::warn!`] with a [`Blocker::block`] call
300/// to ensure consistent logging at every block site. The peer is always
301/// included as a `peer` field in the log output.
302///
303/// # Examples
304///
305/// ```ignore
306/// block!(self.blocker, sender, "invalid message");
307/// block!(self.blocker, sender, ?err, "invalid ack signature");
308/// block!(self.blocker, sender, %view, "blocking peer for epoch mismatch");
309/// ```
310#[cfg(not(any(
311    commonware_stability_GAMMA,
312    commonware_stability_DELTA,
313    commonware_stability_EPSILON,
314    commonware_stability_RESERVED
315)))] // BETA
316#[macro_export]
317macro_rules! block {
318    ($blocker:expr, $peer:expr, $($arg:tt)+) => {
319        let peer = $peer;
320        tracing::warn!(peer = ?peer, $($arg)+);
321        #[allow(clippy::disallowed_methods)]
322        $blocker.block(peer).await;
323    };
324}
325
326/// Block a peer without logging.
327#[allow(
328    clippy::disallowed_methods,
329    reason = "test helper that bypasses the block! macro"
330)]
331#[cfg(test)]
332pub async fn block_peer<B: Blocker>(blocker: &mut B, peer: B::PublicKey) {
333    blocker.block(peer).await;
334}