Skip to main content

commonware_p2p/
lib.rs

1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9    html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17    use commonware_actor::{Feedback, Unreliable};
18    use commonware_cryptography::PublicKey;
19    use commonware_runtime::{IoBuf, IoBufs};
20    use commonware_utils::{
21        channel::mpsc,
22        ordered::{Map, Set},
23    };
24    use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
25
26    pub mod authenticated;
27    pub mod types;
28    pub mod utils;
29
30    pub use types::{Address, Ingress};
31
32    /// Tuple representing a message received from a given public key.
33    ///
34    /// This message is guaranteed to adhere to the configuration of the channel and
35    /// will already be decrypted and authenticated.
36    pub type Message<P> = (P, IoBuf);
37
38    /// Alias for identifying communication channels.
39    pub type Channel = u64;
40
41    /// Enum indicating the set of recipients to send a message to.
42    #[derive(Clone, Debug)]
43    pub enum Recipients<P: PublicKey> {
44        All,
45        Some(Vec<P>),
46        One(P),
47    }
48
49    /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
50    pub trait UnlimitedSender: Clone + Send + Sync + 'static {
51        /// Public key type used to identify recipients.
52        type PublicKey: PublicKey;
53
54        /// Sends a message to a set of recipients.
55        ///
56        /// # Offline Recipients
57        ///
58        /// If a recipient is offline at the time a message is sent, the message
59        /// will be dropped. It is up to the application to handle retries (if
60        /// necessary).
61        ///
62        /// # Returns
63        ///
64        /// Feedback from submitting the message for delivery.
65        /// [`Unreliable`] indicates that local submission may be rejected under backpressure.
66        /// [`Feedback::accepted`] does not guarantee that the recipient will receive the message.
67        fn send(
68            &mut self,
69            recipients: Recipients<Self::PublicKey>,
70            message: impl Into<IoBufs> + Send,
71            priority: bool,
72        ) -> Unreliable<Feedback>;
73    }
74
75    /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
76    /// filtering out any that are currently rate-limited.
77    pub trait LimitedSender: Clone + Send + Sync + 'static {
78        /// Public key type used to identify recipients.
79        type PublicKey: PublicKey;
80
81        /// The type of [`CheckedSender`] returned after checking recipients.
82        type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
83        where
84            Self: 'a;
85
86        /// Checks which recipients are within their rate limit and returns a
87        /// [`CheckedSender`] for sending to them.
88        ///
89        /// # Rate Limiting
90        ///
91        /// Recipients that exceed their rate limit will be filtered out. The
92        /// returned [`CheckedSender`] will only send to non-limited recipients.
93        ///
94        /// # Returns
95        ///
96        /// A [`CheckedSender`] containing only the recipients that are not
97        /// currently rate-limited, or an error with the earliest instant at which
98        /// all recipients will be available if all are rate-limited.
99        fn check(
100            &mut self,
101            recipients: Recipients<Self::PublicKey>,
102        ) -> Result<Self::Checked<'_>, SystemTime>;
103    }
104
105    /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
106    pub trait CheckedSender: Send {
107        /// Public key type used to identify [`Recipients`].
108        type PublicKey: PublicKey;
109
110        /// Returns the recipients retained by the check.
111        fn recipients(&self) -> Vec<Self::PublicKey>;
112
113        /// Sends a message to the pre-checked recipients.
114        ///
115        /// # Offline Recipients
116        ///
117        /// If a recipient is offline at the time a message is sent, the message
118        /// will be dropped. It is up to the application to handle retries (if
119        /// necessary).
120        ///
121        /// # Returns
122        ///
123        /// Feedback from submitting the message for delivery.
124        /// [`Unreliable`] indicates that local submission may be rejected under backpressure.
125        /// [`Feedback::accepted`] does not guarantee that the recipient will receive the message.
126        fn send(self, message: impl Into<IoBufs> + Send, priority: bool) -> Unreliable<Feedback>;
127    }
128
129    /// Interface for sending messages to a set of recipients.
130    pub trait Sender: LimitedSender {
131        /// Sends a message to a set of recipients.
132        ///
133        /// # Offline Recipients
134        ///
135        /// If a recipient is offline at the time a message is sent, the message
136        /// will be dropped. It is up to the application to handle retries (if
137        /// necessary).
138        ///
139        /// # Rate Limiting
140        ///
141        /// Recipients that exceed their rate limit will be skipped. The message is
142        /// still sent to non-limited recipients.
143        ///
144        /// # Returns
145        ///
146        /// The recipients we will attempt to send to. Returns an
147        /// empty list if all recipients are rate-limited, the sender has closed, or the send is
148        /// not accepted.
149        fn send(
150            &mut self,
151            recipients: Recipients<Self::PublicKey>,
152            message: impl Into<IoBufs> + Send,
153            priority: bool,
154        ) -> Vec<Self::PublicKey> {
155            self.check(recipients).map_or_else(
156                |_| Vec::new(),
157                |checked_sender| {
158                    let recipients = checked_sender.recipients();
159                    let feedback = checked_sender.send(message, priority);
160                    if feedback.accepted() {
161                        recipients
162                    } else {
163                        Vec::new()
164                    }
165                },
166            )
167        }
168    }
169
170    // Blanket implementation of `Sender` for all `LimitedSender`s.
171    impl<S: LimitedSender> Sender for S {}
172
173    /// Interface for receiving messages from arbitrary recipients.
174    pub trait Receiver: Debug + Send + 'static {
175        /// Error that can occur when receiving a message.
176        type Error: Debug + StdError + Send + Sync;
177
178        /// Public key type used to identify recipients.
179        type PublicKey: PublicKey;
180
181        /// Receive a message from an arbitrary recipient.
182        fn recv(
183            &mut self,
184        ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
185    }
186
187    /// Notification sent to subscribers when a peer set changes.
188    #[derive(Clone, Debug)]
189    pub struct PeerSetUpdate<P: PublicKey> {
190        /// The index of the peer set that changed.
191        pub index: u64,
192        /// The primary and secondary peers in the new set.
193        pub latest: TrackedPeers<P>,
194        /// Union of primary and secondary peers across all tracked peer sets.
195        pub all: TrackedPeers<P>,
196    }
197
198    /// Alias for the subscription type returned by [`Provider::subscribe`].
199    pub type PeerSetSubscription<P> = mpsc::UnboundedReceiver<PeerSetUpdate<P>>;
200
201    /// Primary and secondary peers provided together to [`Manager::track`].
202    ///
203    /// The same public key may appear in both `primary` and `secondary`. [`Manager::track`]
204    /// deduplicates overlapping keys, storing them as primary only.
205    #[derive(Clone, Debug, PartialEq, Eq)]
206    pub struct TrackedPeers<P: PublicKey> {
207        /// Peers eligible for primary-only policies.
208        pub primary: Set<P>,
209        /// Peers eligible for secondary-only policies.
210        pub secondary: Set<P>,
211    }
212
213    impl<P: PublicKey> TrackedPeers<P> {
214        pub const fn new(primary: Set<P>, secondary: Set<P>) -> Self {
215            Self { primary, secondary }
216        }
217
218        pub fn primary(primary: Set<P>) -> Self {
219            Self::new(primary, Set::default())
220        }
221
222        /// Returns the deduplicated union of primary and secondary peers.
223        pub fn union(self) -> Set<P> {
224            Set::from_iter_dedup(self.primary.into_iter().chain(self.secondary))
225        }
226    }
227
228    impl<P: PublicKey> From<Set<P>> for TrackedPeers<P> {
229        fn from(primary: Set<P>) -> Self {
230            Self::primary(primary)
231        }
232    }
233
234    impl<P: PublicKey> Default for TrackedPeers<P> {
235        fn default() -> Self {
236            Self::new(Set::default(), Set::default())
237        }
238    }
239
240    /// Primary and secondary peers provided together to [`AddressableManager::track`].
241    ///
242    /// The same public key may appear in both maps. [`AddressableManager::track`]
243    /// deduplicates overlapping keys, storing them as primary only.
244    #[derive(Clone, Debug)]
245    pub struct AddressableTrackedPeers<P: PublicKey> {
246        /// Addresses for peers eligible for primary-only policies.
247        pub primary: Map<P, Address>,
248        /// Addresses for peers eligible for secondary-only policies.
249        pub secondary: Map<P, Address>,
250    }
251
252    impl<P: PublicKey> AddressableTrackedPeers<P> {
253        pub const fn new(primary: Map<P, Address>, secondary: Map<P, Address>) -> Self {
254            Self { primary, secondary }
255        }
256
257        pub fn primary(primary: Map<P, Address>) -> Self {
258            Self::new(primary, Map::default())
259        }
260    }
261
262    impl<P: PublicKey> From<Map<P, Address>> for AddressableTrackedPeers<P> {
263        fn from(primary: Map<P, Address>) -> Self {
264            Self::primary(primary)
265        }
266    }
267
268    /// Interface for reading peer set information.
269    pub trait Provider: Debug + Clone + Send + 'static {
270        /// Public key type used to identify peers.
271        type PublicKey: PublicKey;
272
273        /// Fetch the primary and secondary peers tracked at the given ID.
274        fn peer_set(
275            &mut self,
276            id: u64,
277        ) -> impl Future<Output = Option<TrackedPeers<Self::PublicKey>>> + Send;
278
279        /// Subscribe to notifications when new peer sets are added.
280        ///
281        /// Returns a receiver of [`PeerSetUpdate`] notifications. Each update's
282        /// `latest` reflects how [`Manager::track`] stored the set: a peer listed in
283        /// both roles appears only under `latest.primary`. The `all` field aggregates
284        /// across tracked sets with the same rule (secondary excludes keys present as primary).
285        fn subscribe(
286            &mut self,
287        ) -> impl Future<Output = PeerSetSubscription<Self::PublicKey>> + Send;
288    }
289
290    /// Interface for managing peer set membership (where peer addresses are not known).
291    pub trait Manager: Provider {
292        /// Track a primary and secondary peer set with the given ID.
293        ///
294        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
295        /// of the consensus engine. It must be monotonically increasing as new peer sets are
296        /// tracked.
297        ///
298        /// For good connectivity, all peers must track the same peer sets at the same ID.
299        ///
300        /// Callers may pass either a list of primary peers or a [`TrackedPeers`] value containing both primary and secondary peers.
301        ///
302        /// Overlapping keys in [`TrackedPeers`] are allowed; they are deduplicated as primary only.
303        ///
304        /// ## Active Peers
305        ///
306        /// The most recently registered peer set (highest ID) is considered the
307        /// active set. Implementations use the active set to decide which peers to
308        /// maintain connections with and which to disconnect from.
309        ///
310        /// ## Primary vs Secondary Peers
311        ///
312        /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
313        /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
314        /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
315        /// often both gossiping data to them and answering requests from them).
316        fn track<R>(&mut self, id: u64, peers: R) -> Feedback
317        where
318            R: Into<TrackedPeers<Self::PublicKey>> + Send;
319    }
320
321    /// Interface for managing peer set membership (where peer addresses are known).
322    pub trait AddressableManager: Provider {
323        /// Track a primary peer set and secondary peers with the given ID.
324        ///
325        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
326        /// of the consensus engine. It must be monotonically increasing as new peer sets are
327        /// tracked.
328        ///
329        /// For good connectivity, all peers must track the same peer sets at the same ID.
330        ///
331        /// Callers may pass either a list of primary peers or a [`AddressableTrackedPeers`] value containing
332        /// both primary and secondary peers.
333        ///
334        /// The same key may appear in both maps; see [`AddressableTrackedPeers`].
335        ///
336        /// ## Active Peers
337        ///
338        /// The most recently registered peer set (highest ID) is considered the
339        /// active set. Implementations use the active set to decide which peers to
340        /// maintain connections with and which to disconnect from.
341        ///
342        /// ## Primary vs Secondary Peers
343        ///
344        /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
345        /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
346        /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
347        /// often both gossiping data to them and answering requests from them).
348        fn track<R>(&mut self, id: u64, peers: R) -> Feedback
349        where
350            R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send;
351
352        /// Update addresses for multiple peers without creating a new peer set.
353        ///
354        /// For each primary or secondary peer with a changed address:
355        /// - Any existing connection to the peer is severed (it was on the old IP)
356        /// - The listener's allowed IPs are updated to reflect the new egress IP
357        /// - Future connections will use the new address
358        fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) -> Feedback;
359    }
360
361    /// Interface for blocking other peers.
362    pub trait Blocker: Clone + Send + 'static {
363        /// Public key type used to identify peers.
364        type PublicKey: PublicKey;
365
366        /// Block a peer, disconnecting them if currently connected and preventing future connections.
367        fn block(&mut self, peer: Self::PublicKey) -> Feedback;
368    }
369});
370
371/// Logs a warning and blocks a peer in a single call.
372///
373/// This macro combines a [`tracing::warn!`] with a [`Blocker::block`] call
374/// to ensure consistent logging at every block site. The peer is always
375/// included as a `peer` field in the log output.
376///
377/// # Examples
378///
379/// ```ignore
380/// block!(self.blocker, sender, "invalid message");
381/// block!(self.blocker, sender, ?err, "invalid ack signature");
382/// block!(self.blocker, sender, %view, "blocking peer for epoch mismatch");
383/// ```
384#[cfg(not(any(
385    commonware_stability_GAMMA,
386    commonware_stability_DELTA,
387    commonware_stability_EPSILON,
388    commonware_stability_RESERVED
389)))] // BETA
390#[macro_export]
391macro_rules! block {
392    ($blocker:expr, $peer:expr, $($arg:tt)+) => {
393        let peer = $peer;
394        tracing::warn!(peer = ?peer, $($arg)+);
395        #[allow(clippy::disallowed_methods)]
396        $blocker.block(peer)
397    };
398}
399
400/// Block a peer without logging.
401#[allow(
402    clippy::disallowed_methods,
403    reason = "test helper that bypasses the block! macro"
404)]
405#[cfg(test)]
406pub fn block_peer<B: Blocker>(blocker: &mut B, peer: B::PublicKey) -> Feedback {
407    blocker.block(peer)
408}