commonware_p2p/lib.rs
1//! Communicate with authenticated peers over encrypted connections.
2//!
3//! # Status
4//!
5//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
6
7#![doc(
8 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
9 html_favicon_url = "https://commonware.xyz/favicon.ico"
10)]
11
12use commonware_macros::{stability_mod, stability_scope};
13
14stability_mod!(ALPHA, pub mod simulated);
15
16stability_scope!(BETA {
17 use commonware_actor::{Feedback, Unreliable};
18 use commonware_cryptography::PublicKey;
19 use commonware_runtime::{IoBuf, IoBufs};
20 use commonware_utils::{
21 channel::mpsc,
22 ordered::{Map, Set},
23 };
24 use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};
25
26 pub mod authenticated;
27 pub mod types;
28 pub mod utils;
29
30 pub use types::{Address, Ingress};
31
32 /// Tuple representing a message received from a given public key.
33 ///
34 /// This message is guaranteed to adhere to the configuration of the channel and
35 /// will already be decrypted and authenticated.
36 pub type Message<P> = (P, IoBuf);
37
38 /// Alias for identifying communication channels.
39 pub type Channel = u64;
40
41 /// Enum indicating the set of recipients to send a message to.
42 #[derive(Clone, Debug)]
43 pub enum Recipients<P: PublicKey> {
44 All,
45 Some(Vec<P>),
46 One(P),
47 }
48
49 /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
50 pub trait UnlimitedSender: Clone + Send + Sync + 'static {
51 /// Public key type used to identify recipients.
52 type PublicKey: PublicKey;
53
54 /// Sends a message to a set of recipients.
55 ///
56 /// # Offline Recipients
57 ///
58 /// If a recipient is offline at the time a message is sent, the message
59 /// will be dropped. It is up to the application to handle retries (if
60 /// necessary).
61 ///
62 /// # Returns
63 ///
64 /// Feedback from submitting the message for delivery.
65 /// [`Unreliable`] indicates that local submission may be rejected under backpressure.
66 /// [`Feedback::accepted`] does not guarantee that the recipient will receive the message.
67 fn send(
68 &mut self,
69 recipients: Recipients<Self::PublicKey>,
70 message: impl Into<IoBufs> + Send,
71 priority: bool,
72 ) -> Unreliable<Feedback>;
73 }
74
75 /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
76 /// filtering out any that are currently rate-limited.
77 pub trait LimitedSender: Clone + Send + Sync + 'static {
78 /// Public key type used to identify recipients.
79 type PublicKey: PublicKey;
80
81 /// The type of [`CheckedSender`] returned after checking recipients.
82 type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
83 where
84 Self: 'a;
85
86 /// Checks which recipients are within their rate limit and returns a
87 /// [`CheckedSender`] for sending to them.
88 ///
89 /// # Rate Limiting
90 ///
91 /// Recipients that exceed their rate limit will be filtered out. The
92 /// returned [`CheckedSender`] will only send to non-limited recipients.
93 ///
94 /// # Returns
95 ///
96 /// A [`CheckedSender`] containing only the recipients that are not
97 /// currently rate-limited, or an error with the earliest instant at which
98 /// all recipients will be available if all are rate-limited.
99 fn check(
100 &mut self,
101 recipients: Recipients<Self::PublicKey>,
102 ) -> Result<Self::Checked<'_>, SystemTime>;
103 }
104
105 /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
106 pub trait CheckedSender: Send {
107 /// Public key type used to identify [`Recipients`].
108 type PublicKey: PublicKey;
109
110 /// Returns the recipients retained by the check.
111 fn recipients(&self) -> Vec<Self::PublicKey>;
112
113 /// Sends a message to the pre-checked recipients.
114 ///
115 /// # Offline Recipients
116 ///
117 /// If a recipient is offline at the time a message is sent, the message
118 /// will be dropped. It is up to the application to handle retries (if
119 /// necessary).
120 ///
121 /// # Returns
122 ///
123 /// Feedback from submitting the message for delivery.
124 /// [`Unreliable`] indicates that local submission may be rejected under backpressure.
125 /// [`Feedback::accepted`] does not guarantee that the recipient will receive the message.
126 fn send(self, message: impl Into<IoBufs> + Send, priority: bool) -> Unreliable<Feedback>;
127 }
128
129 /// Interface for sending messages to a set of recipients.
130 pub trait Sender: LimitedSender {
131 /// Sends a message to a set of recipients.
132 ///
133 /// # Offline Recipients
134 ///
135 /// If a recipient is offline at the time a message is sent, the message
136 /// will be dropped. It is up to the application to handle retries (if
137 /// necessary).
138 ///
139 /// # Rate Limiting
140 ///
141 /// Recipients that exceed their rate limit will be skipped. The message is
142 /// still sent to non-limited recipients.
143 ///
144 /// # Returns
145 ///
146 /// The recipients we will attempt to send to. Returns an
147 /// empty list if all recipients are rate-limited, the sender has closed, or the send is
148 /// not accepted.
149 fn send(
150 &mut self,
151 recipients: Recipients<Self::PublicKey>,
152 message: impl Into<IoBufs> + Send,
153 priority: bool,
154 ) -> Vec<Self::PublicKey> {
155 self.check(recipients).map_or_else(
156 |_| Vec::new(),
157 |checked_sender| {
158 let recipients = checked_sender.recipients();
159 let feedback = checked_sender.send(message, priority);
160 if feedback.accepted() {
161 recipients
162 } else {
163 Vec::new()
164 }
165 },
166 )
167 }
168 }
169
170 // Blanket implementation of `Sender` for all `LimitedSender`s.
171 impl<S: LimitedSender> Sender for S {}
172
173 /// Interface for receiving messages from arbitrary recipients.
174 pub trait Receiver: Debug + Send + 'static {
175 /// Error that can occur when receiving a message.
176 type Error: Debug + StdError + Send + Sync;
177
178 /// Public key type used to identify recipients.
179 type PublicKey: PublicKey;
180
181 /// Receive a message from an arbitrary recipient.
182 fn recv(
183 &mut self,
184 ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
185 }
186
187 /// Notification sent to subscribers when a peer set changes.
188 #[derive(Clone, Debug)]
189 pub struct PeerSetUpdate<P: PublicKey> {
190 /// The index of the peer set that changed.
191 pub index: u64,
192 /// The primary and secondary peers in the new set.
193 pub latest: TrackedPeers<P>,
194 /// Union of primary and secondary peers across all tracked peer sets.
195 pub all: TrackedPeers<P>,
196 }
197
198 /// Alias for the subscription type returned by [`Provider::subscribe`].
199 pub type PeerSetSubscription<P> = mpsc::UnboundedReceiver<PeerSetUpdate<P>>;
200
201 /// Primary and secondary peers provided together to [`Manager::track`].
202 ///
203 /// The same public key may appear in both `primary` and `secondary`. [`Manager::track`]
204 /// deduplicates overlapping keys, storing them as primary only.
205 #[derive(Clone, Debug, PartialEq, Eq)]
206 pub struct TrackedPeers<P: PublicKey> {
207 /// Peers eligible for primary-only policies.
208 pub primary: Set<P>,
209 /// Peers eligible for secondary-only policies.
210 pub secondary: Set<P>,
211 }
212
213 impl<P: PublicKey> TrackedPeers<P> {
214 pub const fn new(primary: Set<P>, secondary: Set<P>) -> Self {
215 Self { primary, secondary }
216 }
217
218 pub fn primary(primary: Set<P>) -> Self {
219 Self::new(primary, Set::default())
220 }
221
222 /// Returns the deduplicated union of primary and secondary peers.
223 pub fn union(self) -> Set<P> {
224 Set::from_iter_dedup(self.primary.into_iter().chain(self.secondary))
225 }
226 }
227
228 impl<P: PublicKey> From<Set<P>> for TrackedPeers<P> {
229 fn from(primary: Set<P>) -> Self {
230 Self::primary(primary)
231 }
232 }
233
234 impl<P: PublicKey> Default for TrackedPeers<P> {
235 fn default() -> Self {
236 Self::new(Set::default(), Set::default())
237 }
238 }
239
240 /// Primary and secondary peers provided together to [`AddressableManager::track`].
241 ///
242 /// The same public key may appear in both maps. [`AddressableManager::track`]
243 /// deduplicates overlapping keys, storing them as primary only.
244 #[derive(Clone, Debug)]
245 pub struct AddressableTrackedPeers<P: PublicKey> {
246 /// Addresses for peers eligible for primary-only policies.
247 pub primary: Map<P, Address>,
248 /// Addresses for peers eligible for secondary-only policies.
249 pub secondary: Map<P, Address>,
250 }
251
252 impl<P: PublicKey> AddressableTrackedPeers<P> {
253 pub const fn new(primary: Map<P, Address>, secondary: Map<P, Address>) -> Self {
254 Self { primary, secondary }
255 }
256
257 pub fn primary(primary: Map<P, Address>) -> Self {
258 Self::new(primary, Map::default())
259 }
260 }
261
262 impl<P: PublicKey> From<Map<P, Address>> for AddressableTrackedPeers<P> {
263 fn from(primary: Map<P, Address>) -> Self {
264 Self::primary(primary)
265 }
266 }
267
268 /// Interface for reading peer set information.
269 pub trait Provider: Debug + Clone + Send + 'static {
270 /// Public key type used to identify peers.
271 type PublicKey: PublicKey;
272
273 /// Fetch the primary and secondary peers tracked at the given ID.
274 fn peer_set(
275 &mut self,
276 id: u64,
277 ) -> impl Future<Output = Option<TrackedPeers<Self::PublicKey>>> + Send;
278
279 /// Subscribe to notifications when new peer sets are added.
280 ///
281 /// Returns a receiver of [`PeerSetUpdate`] notifications. Each update's
282 /// `latest` reflects how [`Manager::track`] stored the set: a peer listed in
283 /// both roles appears only under `latest.primary`. The `all` field aggregates
284 /// across tracked sets with the same rule (secondary excludes keys present as primary).
285 fn subscribe(
286 &mut self,
287 ) -> impl Future<Output = PeerSetSubscription<Self::PublicKey>> + Send;
288 }
289
290 /// Interface for managing peer set membership (where peer addresses are not known).
291 pub trait Manager: Provider {
292 /// Track a primary and secondary peer set with the given ID.
293 ///
294 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
295 /// of the consensus engine. It must be monotonically increasing as new peer sets are
296 /// tracked.
297 ///
298 /// For good connectivity, all peers must track the same peer sets at the same ID.
299 ///
300 /// Callers may pass either a list of primary peers or a [`TrackedPeers`] value containing both primary and secondary peers.
301 ///
302 /// Overlapping keys in [`TrackedPeers`] are allowed; they are deduplicated as primary only.
303 ///
304 /// ## Active Peers
305 ///
306 /// The most recently registered peer set (highest ID) is considered the
307 /// active set. Implementations use the active set to decide which peers to
308 /// maintain connections with and which to disconnect from.
309 ///
310 /// ## Primary vs Secondary Peers
311 ///
312 /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
313 /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
314 /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
315 /// often both gossiping data to them and answering requests from them).
316 fn track<R>(&mut self, id: u64, peers: R) -> Feedback
317 where
318 R: Into<TrackedPeers<Self::PublicKey>> + Send;
319 }
320
321 /// Interface for managing peer set membership (where peer addresses are known).
322 pub trait AddressableManager: Provider {
323 /// Track a primary peer set and secondary peers with the given ID.
324 ///
325 /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
326 /// of the consensus engine. It must be monotonically increasing as new peer sets are
327 /// tracked.
328 ///
329 /// For good connectivity, all peers must track the same peer sets at the same ID.
330 ///
331 /// Callers may pass either a list of primary peers or a [`AddressableTrackedPeers`] value containing
332 /// both primary and secondary peers.
333 ///
334 /// The same key may appear in both maps; see [`AddressableTrackedPeers`].
335 ///
336 /// ## Active Peers
337 ///
338 /// The most recently registered peer set (highest ID) is considered the
339 /// active set. Implementations use the active set to decide which peers to
340 /// maintain connections with and which to disconnect from.
341 ///
342 /// ## Primary vs Secondary Peers
343 ///
344 /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
345 /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
346 /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
347 /// often both gossiping data to them and answering requests from them).
348 fn track<R>(&mut self, id: u64, peers: R) -> Feedback
349 where
350 R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send;
351
352 /// Update addresses for multiple peers without creating a new peer set.
353 ///
354 /// For each primary or secondary peer with a changed address:
355 /// - Any existing connection to the peer is severed (it was on the old IP)
356 /// - The listener's allowed IPs are updated to reflect the new egress IP
357 /// - Future connections will use the new address
358 fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) -> Feedback;
359 }
360
361 /// Interface for blocking other peers.
362 pub trait Blocker: Clone + Send + 'static {
363 /// Public key type used to identify peers.
364 type PublicKey: PublicKey;
365
366 /// Block a peer, disconnecting them if currently connected and preventing future connections.
367 fn block(&mut self, peer: Self::PublicKey) -> Feedback;
368 }
369});
370
371/// Logs a warning and blocks a peer in a single call.
372///
373/// This macro combines a [`tracing::warn!`] with a [`Blocker::block`] call
374/// to ensure consistent logging at every block site. The peer is always
375/// included as a `peer` field in the log output.
376///
377/// # Examples
378///
379/// ```ignore
380/// block!(self.blocker, sender, "invalid message");
381/// block!(self.blocker, sender, ?err, "invalid ack signature");
382/// block!(self.blocker, sender, %view, "blocking peer for epoch mismatch");
383/// ```
384#[cfg(not(any(
385 commonware_stability_GAMMA,
386 commonware_stability_DELTA,
387 commonware_stability_EPSILON,
388 commonware_stability_RESERVED
389)))] // BETA
390#[macro_export]
391macro_rules! block {
392 ($blocker:expr, $peer:expr, $($arg:tt)+) => {
393 let peer = $peer;
394 tracing::warn!(peer = ?peer, $($arg)+);
395 #[allow(clippy::disallowed_methods)]
396 $blocker.block(peer)
397 };
398}
399
400/// Block a peer without logging.
401#[allow(
402 clippy::disallowed_methods,
403 reason = "test helper that bypasses the block! macro"
404)]
405#[cfg(test)]
406pub fn block_peer<B: Blocker>(blocker: &mut B, peer: B::PublicKey) -> Feedback {
407 blocker.block(peer)
408}