Skip to main content

commonware_p2p/
lib.rs

1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9    html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17    use commonware_cryptography::PublicKey;
18    use commonware_runtime::{IoBuf, IoBufMut};
19    use commonware_utils::{channel::mpsc, ordered::{Map, Set}};
20    use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
21
22    pub mod authenticated;
23    pub mod types;
24    pub mod utils;
25
26    pub use types::{Address, Ingress};
27
28    /// Tuple representing a message received from a given public key.
29    ///
30    /// This message is guaranteed to adhere to the configuration of the channel and
31    /// will already be decrypted and authenticated.
32    pub type Message<P> = (P, IoBuf);
33
34    /// Alias for identifying communication channels.
35    pub type Channel = u64;
36
37    /// Enum indicating the set of recipients to send a message to.
38    #[derive(Clone, Debug)]
39    pub enum Recipients<P: PublicKey> {
40        All,
41        Some(Vec<P>),
42        One(P),
43    }
44
45    /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
46    pub trait UnlimitedSender: Clone + Send + Sync + 'static {
47        /// Public key type used to identify recipients.
48        type PublicKey: PublicKey;
49
50        /// Error that can occur when sending a message.
51        type Error: Debug + StdError + Send + Sync + 'static;
52
53        /// Sends a message to a set of recipients.
54        ///
55        /// # Offline Recipients
56        ///
57        /// If a recipient is offline at the time a message is sent, the message
58        /// will be dropped. It is up to the application to handle retries (if
59        /// necessary).
60        ///
61        /// # Returns
62        ///
63        /// A vector of recipients that the message was sent to, or an error if the
64        /// message could not be sent due to a validation failure (e.g., too large).
65        ///
66        /// Note: a successful send does not guarantee that the recipient will
67        /// receive the message.
68        ///
69        /// # Graceful Shutdown
70        ///
71        /// Implementations must handle internal channel closures gracefully during
72        /// shutdown. If the underlying network is shutting down, this method should
73        /// return `Ok` (possibly with an empty or partial recipient list) rather
74        /// than an error. Errors should only be returned for validation failures
75        /// that the caller can act upon.
76        fn send(
77            &mut self,
78            recipients: Recipients<Self::PublicKey>,
79            message: impl Into<IoBufMut> + Send,
80            priority: bool,
81        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
82    }
83
84    /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
85    /// filtering out any that are currently rate-limited.
86    pub trait LimitedSender: Clone + Send + Sync + 'static {
87        /// Public key type used to identify recipients.
88        type PublicKey: PublicKey;
89
90        /// The type of [`CheckedSender`] returned after checking recipients.
91        type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
92        where
93            Self: 'a;
94
95        /// Checks which recipients are within their rate limit and returns a
96        /// [`CheckedSender`] for sending to them.
97        ///
98        /// # Rate Limiting
99        ///
100        /// Recipients that exceed their rate limit will be filtered out. The
101        /// returned [`CheckedSender`] will only send to non-limited recipients.
102        ///
103        /// # Returns
104        ///
105        /// A [`CheckedSender`] containing only the recipients that are not
106        /// currently rate-limited, or an error with the earliest instant at which
107        /// all recipients will be available if all are rate-limited.
108        fn check<'a>(
109            &'a mut self,
110            recipients: Recipients<Self::PublicKey>,
111        ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
112    }
113
114    /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
115    pub trait CheckedSender: Send {
116        /// Public key type used to identify [`Recipients`].
117        type PublicKey: PublicKey;
118
119        /// Error that can occur when sending a message.
120        type Error: Debug + StdError + Send + Sync + 'static;
121
122        /// Sends a message to the pre-checked recipients.
123        ///
124        /// # Offline Recipients
125        ///
126        /// If a recipient is offline at the time a message is sent, the message
127        /// will be dropped. It is up to the application to handle retries (if
128        /// necessary).
129        ///
130        /// # Returns
131        ///
132        /// A vector of recipients that the message was sent to, or an error if the
133        /// message could not be sent due to a validation failure (e.g., too large).
134        ///
135        /// Note: a successful send does not guarantee that the recipient will
136        /// receive the message.
137        ///
138        /// # Graceful Shutdown
139        ///
140        /// Implementations must handle internal channel closures gracefully during
141        /// shutdown. If the underlying network is shutting down, this method should
142        /// return `Ok` (possibly with an empty or partial recipient list) rather
143        /// than an error. Errors should only be returned for validation failures
144        /// that the caller can act upon.
145        fn send(
146            self,
147            message: impl Into<IoBufMut> + Send,
148            priority: bool,
149        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
150    }
151
152    /// Interface for sending messages to a set of recipients.
153    pub trait Sender: LimitedSender {
154        /// Sends a message to a set of recipients.
155        ///
156        /// # Offline Recipients
157        ///
158        /// If a recipient is offline at the time a message is sent, the message
159        /// will be dropped. It is up to the application to handle retries (if
160        /// necessary).
161        ///
162        /// # Rate Limiting
163        ///
164        /// Recipients that exceed their rate limit will be skipped. The message is
165        /// still sent to non-limited recipients. Check the returned vector to see
166        /// which peers were sent the message.
167        ///
168        /// # Returns
169        ///
170        /// A vector of recipients that the message was sent to, or an error if the
171        /// message could not be sent due to a validation failure (e.g., too large).
172        ///
173        /// Note: a successful send does not guarantee that the recipient will
174        /// receive the message.
175        ///
176        /// # Graceful Shutdown
177        ///
178        /// Implementations must handle internal channel closures gracefully during
179        /// shutdown. If the underlying network is shutting down, this method should
180        /// return `Ok` (possibly with an empty or partial recipient list) rather
181        /// than an error. Errors should only be returned for validation failures
182        /// that the caller can act upon.
183        fn send(
184            &mut self,
185            recipients: Recipients<Self::PublicKey>,
186            message: impl Into<IoBufMut> + Send,
187            priority: bool,
188        ) -> impl Future<
189            Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
190        > + Send {
191            async move {
192                match self.check(recipients).await {
193                    Ok(checked_sender) => checked_sender.send(message, priority).await,
194                    Err(_) => Ok(Vec::new()),
195                }
196            }
197        }
198    }
199
200    // Blanket implementation of `Sender` for all `LimitedSender`s.
201    impl<S: LimitedSender> Sender for S {}
202
203    /// Interface for receiving messages from arbitrary recipients.
204    pub trait Receiver: Debug + Send + 'static {
205        /// Error that can occur when receiving a message.
206        type Error: Debug + StdError + Send + Sync;
207
208        /// Public key type used to identify recipients.
209        type PublicKey: PublicKey;
210
211        /// Receive a message from an arbitrary recipient.
212        fn recv(
213            &mut self,
214        ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
215    }
216
217    /// Interface for reading peer set information.
218    pub trait Provider: Debug + Clone + Send + 'static {
219        /// Public key type used to identify peers.
220        type PublicKey: PublicKey;
221
222        /// Fetch the ordered set of peers for a given ID.
223        fn peer_set(
224            &mut self,
225            id: u64,
226        ) -> impl Future<Output = Option<Set<Self::PublicKey>>> + Send;
227
228        /// Subscribe to notifications when new peer sets are added.
229        ///
230        /// Returns a receiver that will receive tuples of:
231        /// - The peer set ID
232        /// - The peers in the new set
233        /// - All currently tracked peers (union of recent peer sets)
234        #[allow(clippy::type_complexity)]
235        fn subscribe(
236            &mut self,
237        ) -> impl Future<
238            Output = mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)>,
239        > + Send;
240    }
241
242    /// Interface for managing peer set membership (where peer addresses are not known).
243    pub trait Manager: Provider {
244        /// Track a peer set with the given ID and peers.
245        ///
246        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
247        /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
248        ///
249        /// For good connectivity, all peers must track the same peer sets at the same ID.
250        fn track(
251            &mut self,
252            id: u64,
253            peers: Set<Self::PublicKey>,
254        ) -> impl Future<Output = ()> + Send;
255    }
256
257    /// Interface for managing peer set membership (where peer addresses are known).
258    pub trait AddressableManager: Provider {
259        /// Track a peer set with the given ID and peer<PublicKey, Address> pairs.
260        ///
261        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
262        /// of the consensus engine. It must be monotonically increasing as new peer sets are tracked.
263        ///
264        /// For good connectivity, all peers must track the same peer sets at the same ID.
265        fn track(
266            &mut self,
267            id: u64,
268            peers: Map<Self::PublicKey, Address>,
269        ) -> impl Future<Output = ()> + Send;
270
271        /// Update addresses for multiple peers without creating a new peer set.
272        ///
273        /// For each peer that is tracked and has a changed address:
274        /// - Any existing connection to the peer is severed (it was on the old IP)
275        /// - The listener's allowed IPs are updated to reflect the new egress IP
276        /// - Future connections will use the new address
277        fn overwrite(
278            &mut self,
279            peers: Map<Self::PublicKey, Address>,
280        ) -> impl Future<Output = ()> + Send;
281    }
282
283    /// Interface for blocking other peers.
284    pub trait Blocker: Clone + Send + 'static {
285        /// Public key type used to identify peers.
286        type PublicKey: PublicKey;
287
288        /// Block a peer, disconnecting them if currently connected and preventing future connections.
289        fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
290    }
291});