commonware_p2p/
lib.rs

1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! `commonware-p2p` is **ALPHA** software and is not yet recommended for production use. Developers should
6//! expect breaking changes and occasional instability.
7
8#![doc(
9    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
10    html_favicon_url = "https://commonware.xyz/favicon.ico"
11)]
12
13use bytes::{Buf, Bytes};
14use commonware_cryptography::PublicKey;
15use commonware_utils::ordered::Set;
16use futures::channel::mpsc;
17use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
18
19pub mod authenticated;
20pub mod simulated;
21pub mod types;
22pub mod utils;
23
24pub use types::{Address, Ingress};
25
26/// Tuple representing a message received from a given public key.
27///
28/// This message is guaranteed to adhere to the configuration of the channel and
29/// will already be decrypted and authenticated.
30pub type Message<P> = (P, Bytes);
31
32/// Alias for identifying communication channels.
33pub type Channel = u64;
34
35/// Enum indicating the set of recipients to send a message to.
36#[derive(Clone, Debug)]
37pub enum Recipients<P: PublicKey> {
38    All,
39    Some(Vec<P>),
40    One(P),
41}
42
43/// Interface for sending messages to a set of recipients without rate-limiting restrictions.
44pub trait UnlimitedSender: Clone + Send + Sync + 'static {
45    /// Public key type used to identify recipients.
46    type PublicKey: PublicKey;
47
48    /// Error that can occur when sending a message.
49    type Error: Debug + StdError + Send + Sync + 'static;
50
51    /// Sends a message to a set of recipients.
52    ///
53    /// # Offline Recipients
54    ///
55    /// If a recipient is offline at the time a message is sent, the message
56    /// will be dropped. It is up to the application to handle retries (if
57    /// necessary).
58    ///
59    /// # Returns
60    ///
61    /// A vector of recipients that the message was sent to, or an error if the
62    /// message could not be sent due to a validation failure (e.g., too large).
63    ///
64    /// Note: a successful send does not guarantee that the recipient will
65    /// receive the message.
66    ///
67    /// # Graceful Shutdown
68    ///
69    /// Implementations must handle internal channel closures gracefully during
70    /// shutdown. If the underlying network is shutting down, this method should
71    /// return `Ok` (possibly with an empty or partial recipient list) rather
72    /// than an error. Errors should only be returned for validation failures
73    /// that the caller can act upon.
74    fn send(
75        &mut self,
76        recipients: Recipients<Self::PublicKey>,
77        message: impl Buf + Send,
78        priority: bool,
79    ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
80}
81
82/// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
83/// filtering out any that are currently rate-limited.
84pub trait LimitedSender: Clone + Send + Sync + 'static {
85    /// Public key type used to identify recipients.
86    type PublicKey: PublicKey;
87
88    /// The type of [`CheckedSender`] returned after checking recipients.
89    type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
90    where
91        Self: 'a;
92
93    /// Checks which recipients are within their rate limit and returns a
94    /// [`CheckedSender`] for sending to them.
95    ///
96    /// # Rate Limiting
97    ///
98    /// Recipients that exceed their rate limit will be filtered out. The
99    /// returned [`CheckedSender`] will only send to non-limited recipients.
100    ///
101    /// # Returns
102    ///
103    /// A [`CheckedSender`] containing only the recipients that are not
104    /// currently rate-limited, or an error with the earliest instant at which
105    /// all recipients will be available if all are rate-limited.
106    fn check<'a>(
107        &'a mut self,
108        recipients: Recipients<Self::PublicKey>,
109    ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
110}
111
112/// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
113pub trait CheckedSender: Send {
114    /// Public key type used to identify [`Recipients`].
115    type PublicKey: PublicKey;
116
117    /// Error that can occur when sending a message.
118    type Error: Debug + StdError + Send + Sync + 'static;
119
120    /// Sends a message to the pre-checked recipients.
121    ///
122    /// # Offline Recipients
123    ///
124    /// If a recipient is offline at the time a message is sent, the message
125    /// will be dropped. It is up to the application to handle retries (if
126    /// necessary).
127    ///
128    /// # Returns
129    ///
130    /// A vector of recipients that the message was sent to, or an error if the
131    /// message could not be sent due to a validation failure (e.g., too large).
132    ///
133    /// Note: a successful send does not guarantee that the recipient will
134    /// receive the message.
135    ///
136    /// # Graceful Shutdown
137    ///
138    /// Implementations must handle internal channel closures gracefully during
139    /// shutdown. If the underlying network is shutting down, this method should
140    /// return `Ok` (possibly with an empty or partial recipient list) rather
141    /// than an error. Errors should only be returned for validation failures
142    /// that the caller can act upon.
143    fn send(
144        self,
145        message: impl Buf + Send,
146        priority: bool,
147    ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
148}
149
150/// Interface for sending messages to a set of recipients.
151pub trait Sender: LimitedSender {
152    /// Sends a message to a set of recipients.
153    ///
154    /// # Offline Recipients
155    ///
156    /// If a recipient is offline at the time a message is sent, the message
157    /// will be dropped. It is up to the application to handle retries (if
158    /// necessary).
159    ///
160    /// # Rate Limiting
161    ///
162    /// Recipients that exceed their rate limit will be skipped. The message is
163    /// still sent to non-limited recipients. Check the returned vector to see
164    /// which peers were sent the message.
165    ///
166    /// # Returns
167    ///
168    /// A vector of recipients that the message was sent to, or an error if the
169    /// message could not be sent due to a validation failure (e.g., too large).
170    ///
171    /// Note: a successful send does not guarantee that the recipient will
172    /// receive the message.
173    ///
174    /// # Graceful Shutdown
175    ///
176    /// Implementations must handle internal channel closures gracefully during
177    /// shutdown. If the underlying network is shutting down, this method should
178    /// return `Ok` (possibly with an empty or partial recipient list) rather
179    /// than an error. Errors should only be returned for validation failures
180    /// that the caller can act upon.
181    fn send(
182        &mut self,
183        recipients: Recipients<Self::PublicKey>,
184        message: impl Buf + Send,
185        priority: bool,
186    ) -> impl Future<
187        Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
188    > + Send {
189        async move {
190            match self.check(recipients).await {
191                Ok(checked_sender) => checked_sender.send(message, priority).await,
192                Err(_) => Ok(Vec::new()),
193            }
194        }
195    }
196}
197
198// Blanket implementation of `Sender` for all `LimitedSender`s.
199impl<S: LimitedSender> Sender for S {}
200
201/// Interface for receiving messages from arbitrary recipients.
202pub trait Receiver: Debug + Send + 'static {
203    /// Error that can occur when receiving a message.
204    type Error: Debug + StdError + Send + Sync;
205
206    /// Public key type used to identify recipients.
207    type PublicKey: PublicKey;
208
209    /// Receive a message from an arbitrary recipient.
210    fn recv(
211        &mut self,
212    ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
213}
214
215/// Interface for registering new peer sets as well as fetching an ordered list of connected peers, given a set id.
216pub trait Manager: Debug + Clone + Send + 'static {
217    /// Public key type used to identify peers.
218    type PublicKey: PublicKey;
219
220    /// The type for the peer set in registration.
221    type Peers;
222
223    /// Update the peer set.
224    ///
225    /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
226    /// of the consensus engine. It must be monotonically increasing as new peer sets are registered.
227    fn update(&mut self, id: u64, peers: Self::Peers) -> impl Future<Output = ()> + Send;
228
229    /// Fetch the ordered set of peers for a given ID.
230    fn peer_set(&mut self, id: u64) -> impl Future<Output = Option<Set<Self::PublicKey>>> + Send;
231
232    /// Subscribe to notifications when new peer sets are added.
233    ///
234    /// Returns a receiver that will receive the peer set ID whenever a new peer set
235    /// is registered via `update`.
236    #[allow(clippy::type_complexity)]
237    fn subscribe(
238        &mut self,
239    ) -> impl Future<
240        Output = mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)>,
241    > + Send;
242}
243
244/// Interface for blocking other peers.
245pub trait Blocker: Clone + Send + 'static {
246    /// Public key type used to identify peers.
247    type PublicKey: PublicKey;
248
249    /// Block a peer, disconnecting them if currently connected and preventing future connections.
250    fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
251}