commonware_p2p/lib.rs
1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! `commonware-p2p` is **ALPHA** software and is not yet recommended for production use. Developers should
6//! expect breaking changes and occasional instability.
7
8#![doc(
9 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
10 html_favicon_url = "https://commonware.xyz/favicon.ico"
11)]
12
13use bytes::{Buf, Bytes};
14use commonware_cryptography::PublicKey;
15use commonware_utils::ordered::Set;
16use futures::channel::mpsc;
17use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
18
19pub mod authenticated;
20pub mod simulated;
21pub mod types;
22pub mod utils;
23
24pub use types::{Address, Ingress};
25
26/// Tuple representing a message received from a given public key.
27///
28/// This message is guaranteed to adhere to the configuration of the channel and
29/// will already be decrypted and authenticated.
30pub type Message<P> = (P, Bytes);
31
32/// Alias for identifying communication channels.
33pub type Channel = u64;
34
35/// Enum indicating the set of recipients to send a message to.
36#[derive(Clone, Debug)]
37pub enum Recipients<P: PublicKey> {
38 All,
39 Some(Vec<P>),
40 One(P),
41}
42
43/// Interface for sending messages to a set of recipients without rate-limiting restrictions.
44pub trait UnlimitedSender: Clone + Send + Sync + 'static {
45 /// Public key type used to identify recipients.
46 type PublicKey: PublicKey;
47
48 /// Error that can occur when sending a message.
49 type Error: Debug + StdError + Send + Sync + 'static;
50
51 /// Sends a message to a set of recipients.
52 ///
53 /// # Offline Recipients
54 ///
55 /// If a recipient is offline at the time a message is sent, the message
56 /// will be dropped. It is up to the application to handle retries (if
57 /// necessary).
58 ///
59 /// # Returns
60 ///
61 /// A vector of recipients that the message was sent to, or an error if the
62 /// message could not be sent due to a validation failure (e.g., too large).
63 ///
64 /// Note: a successful send does not guarantee that the recipient will
65 /// receive the message.
66 ///
67 /// # Graceful Shutdown
68 ///
69 /// Implementations must handle internal channel closures gracefully during
70 /// shutdown. If the underlying network is shutting down, this method should
71 /// return `Ok` (possibly with an empty or partial recipient list) rather
72 /// than an error. Errors should only be returned for validation failures
73 /// that the caller can act upon.
74 fn send(
75 &mut self,
76 recipients: Recipients<Self::PublicKey>,
77 message: impl Buf + Send,
78 priority: bool,
79 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
80}
81
82/// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
83/// filtering out any that are currently rate-limited.
84pub trait LimitedSender: Clone + Send + Sync + 'static {
85 /// Public key type used to identify recipients.
86 type PublicKey: PublicKey;
87
88 /// The type of [`CheckedSender`] returned after checking recipients.
89 type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
90 where
91 Self: 'a;
92
93 /// Checks which recipients are within their rate limit and returns a
94 /// [`CheckedSender`] for sending to them.
95 ///
96 /// # Rate Limiting
97 ///
98 /// Recipients that exceed their rate limit will be filtered out. The
99 /// returned [`CheckedSender`] will only send to non-limited recipients.
100 ///
101 /// # Returns
102 ///
103 /// A [`CheckedSender`] containing only the recipients that are not
104 /// currently rate-limited, or an error with the earliest instant at which
105 /// all recipients will be available if all are rate-limited.
106 fn check<'a>(
107 &'a mut self,
108 recipients: Recipients<Self::PublicKey>,
109 ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
110}
111
112/// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
113pub trait CheckedSender: Send {
114 /// Public key type used to identify [`Recipients`].
115 type PublicKey: PublicKey;
116
117 /// Error that can occur when sending a message.
118 type Error: Debug + StdError + Send + Sync + 'static;
119
120 /// Sends a message to the pre-checked recipients.
121 ///
122 /// # Offline Recipients
123 ///
124 /// If a recipient is offline at the time a message is sent, the message
125 /// will be dropped. It is up to the application to handle retries (if
126 /// necessary).
127 ///
128 /// # Returns
129 ///
130 /// A vector of recipients that the message was sent to, or an error if the
131 /// message could not be sent due to a validation failure (e.g., too large).
132 ///
133 /// Note: a successful send does not guarantee that the recipient will
134 /// receive the message.
135 ///
136 /// # Graceful Shutdown
137 ///
138 /// Implementations must handle internal channel closures gracefully during
139 /// shutdown. If the underlying network is shutting down, this method should
140 /// return `Ok` (possibly with an empty or partial recipient list) rather
141 /// than an error. Errors should only be returned for validation failures
142 /// that the caller can act upon.
143 fn send(
144 self,
145 message: impl Buf + Send,
146 priority: bool,
147 ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
148}
149
150/// Interface for sending messages to a set of recipients.
151pub trait Sender: LimitedSender {
152 /// Sends a message to a set of recipients.
153 ///
154 /// # Offline Recipients
155 ///
156 /// If a recipient is offline at the time a message is sent, the message
157 /// will be dropped. It is up to the application to handle retries (if
158 /// necessary).
159 ///
160 /// # Rate Limiting
161 ///
162 /// Recipients that exceed their rate limit will be skipped. The message is
163 /// still sent to non-limited recipients. Check the returned vector to see
164 /// which peers were sent the message.
165 ///
166 /// # Returns
167 ///
168 /// A vector of recipients that the message was sent to, or an error if the
169 /// message could not be sent due to a validation failure (e.g., too large).
170 ///
171 /// Note: a successful send does not guarantee that the recipient will
172 /// receive the message.
173 ///
174 /// # Graceful Shutdown
175 ///
176 /// Implementations must handle internal channel closures gracefully during
177 /// shutdown. If the underlying network is shutting down, this method should
178 /// return `Ok` (possibly with an empty or partial recipient list) rather
179 /// than an error. Errors should only be returned for validation failures
180 /// that the caller can act upon.
181 fn send(
182 &mut self,
183 recipients: Recipients<Self::PublicKey>,
184 message: impl Buf + Send,
185 priority: bool,
186 ) -> impl Future<
187 Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
188 > + Send {
189 async move {
190 match self.check(recipients).await {
191 Ok(checked_sender) => checked_sender.send(message, priority).await,
192 Err(_) => Ok(Vec::new()),
193 }
194 }
195 }
196}
197
198// Blanket implementation of `Sender` for all `LimitedSender`s.
199impl<S: LimitedSender> Sender for S {}
200
201/// Interface for receiving messages from arbitrary recipients.
202pub trait Receiver: Debug + Send + 'static {
203 /// Error that can occur when receiving a message.
204 type Error: Debug + StdError + Send + Sync;
205
206 /// Public key type used to identify recipients.
207 type PublicKey: PublicKey;
208
209 /// Receive a message from an arbitrary recipient.
210 fn recv(
211 &mut self,
212 ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
213}
214
215/// Interface for registering new peer sets as well as fetching an ordered list of connected peers, given a set id.
216pub trait Manager: Debug + Clone + Send + 'static {
217 /// Public key type used to identify peers.
218 type PublicKey: PublicKey;
219
220 /// The type for the peer set in registration.
221 type Peers;
222
223 /// Update the peer set.
224 ///
225 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
226 /// of the consensus engine. It must be monotonically increasing as new peer sets are registered.
227 fn update(&mut self, id: u64, peers: Self::Peers) -> impl Future<Output = ()> + Send;
228
229 /// Fetch the ordered set of peers for a given ID.
230 fn peer_set(&mut self, id: u64) -> impl Future<Output = Option<Set<Self::PublicKey>>> + Send;
231
232 /// Subscribe to notifications when new peer sets are added.
233 ///
234 /// Returns a receiver that will receive the peer set ID whenever a new peer set
235 /// is registered via `update`.
236 #[allow(clippy::type_complexity)]
237 fn subscribe(
238 &mut self,
239 ) -> impl Future<
240 Output = mpsc::UnboundedReceiver<(u64, Set<Self::PublicKey>, Set<Self::PublicKey>)>,
241 > + Send;
242}
243
244/// Interface for blocking other peers.
245pub trait Blocker: Clone + Send + 'static {
246 /// Public key type used to identify peers.
247 type PublicKey: PublicKey;
248
249 /// Block a peer, disconnecting them if currently connected and preventing future connections.
250 fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
251}