commonware-p2p 2026.4.0

Communicate with authenticated peers over encrypted connections.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
//! Communicate with authenticated peers over encrypted connections.
//!
//! # Status
//!
//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.

#![doc(
    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
    html_favicon_url = "https://commonware.xyz/favicon.ico"
)]

use commonware_macros::{stability_mod, stability_scope};

stability_mod!(ALPHA, pub mod simulated);

stability_scope!(BETA {
    use commonware_cryptography::PublicKey;
    use commonware_runtime::{IoBuf, IoBufs};
    use commonware_utils::{
        channel::mpsc,
        ordered::{Map, Set},
    };
    use std::{error::Error as StdError, fmt::Debug, future::Future, time::SystemTime};

    pub mod authenticated;
    pub mod types;
    pub mod utils;

    pub use types::{Address, Ingress};

    /// Tuple representing a message received from a given public key.
    ///
    /// This message is guaranteed to adhere to the configuration of the channel and
    /// will already be decrypted and authenticated.
    pub type Message<P> = (P, IoBuf);

    /// Alias for identifying communication channels.
    pub type Channel = u64;

    /// Enum indicating the set of recipients to send a message to.
    #[derive(Clone, Debug)]
    pub enum Recipients<P: PublicKey> {
        All,
        Some(Vec<P>),
        One(P),
    }

    /// Interface for sending messages to a set of recipients without rate-limiting restrictions.
    pub trait UnlimitedSender: Clone + Send + Sync + 'static {
        /// Public key type used to identify recipients.
        type PublicKey: PublicKey;

        /// Error that can occur when sending a message.
        type Error: Debug + StdError + Send + Sync + 'static;

        /// Sends a message to a set of recipients.
        ///
        /// # Offline Recipients
        ///
        /// If a recipient is offline at the time a message is sent, the message
        /// will be dropped. It is up to the application to handle retries (if
        /// necessary).
        ///
        /// # Returns
        ///
        /// A vector of recipients that the message was sent to, or an error if the
        /// message could not be sent due to a validation failure (e.g., too large).
        ///
        /// Note: a successful send does not guarantee that the recipient will
        /// receive the message.
        ///
        /// # Graceful Shutdown
        ///
        /// Implementations must handle internal channel closures gracefully during
        /// shutdown. If the underlying network is shutting down, this method should
        /// return `Ok` (possibly with an empty or partial recipient list) rather
        /// than an error. Errors should only be returned for validation failures
        /// that the caller can act upon.
        fn send(
            &mut self,
            recipients: Recipients<Self::PublicKey>,
            message: impl Into<IoBufs> + Send,
            priority: bool,
        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
    }

    /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`],
    /// filtering out any that are currently rate-limited.
    pub trait LimitedSender: Clone + Send + Sync + 'static {
        /// Public key type used to identify recipients.
        type PublicKey: PublicKey;

        /// The type of [`CheckedSender`] returned after checking recipients.
        type Checked<'a>: CheckedSender<PublicKey = Self::PublicKey> + Send
        where
            Self: 'a;

        /// Checks which recipients are within their rate limit and returns a
        /// [`CheckedSender`] for sending to them.
        ///
        /// # Rate Limiting
        ///
        /// Recipients that exceed their rate limit will be filtered out. The
        /// returned [`CheckedSender`] will only send to non-limited recipients.
        ///
        /// # Returns
        ///
        /// A [`CheckedSender`] containing only the recipients that are not
        /// currently rate-limited, or an error with the earliest instant at which
        /// all recipients will be available if all are rate-limited.
        fn check<'a>(
            &'a mut self,
            recipients: Recipients<Self::PublicKey>,
        ) -> impl Future<Output = Result<Self::Checked<'a>, SystemTime>> + Send;
    }

    /// Interface for sending messages to [`Recipients`] that are not currently rate-limited.
    pub trait CheckedSender: Send {
        /// Public key type used to identify [`Recipients`].
        type PublicKey: PublicKey;

        /// Error that can occur when sending a message.
        type Error: Debug + StdError + Send + Sync + 'static;

        /// Sends a message to the pre-checked recipients.
        ///
        /// # Offline Recipients
        ///
        /// If a recipient is offline at the time a message is sent, the message
        /// will be dropped. It is up to the application to handle retries (if
        /// necessary).
        ///
        /// # Returns
        ///
        /// A vector of recipients that the message was sent to, or an error if the
        /// message could not be sent due to a validation failure (e.g., too large).
        ///
        /// Note: a successful send does not guarantee that the recipient will
        /// receive the message.
        ///
        /// # Graceful Shutdown
        ///
        /// Implementations must handle internal channel closures gracefully during
        /// shutdown. If the underlying network is shutting down, this method should
        /// return `Ok` (possibly with an empty or partial recipient list) rather
        /// than an error. Errors should only be returned for validation failures
        /// that the caller can act upon.
        fn send(
            self,
            message: impl Into<IoBufs> + Send,
            priority: bool,
        ) -> impl Future<Output = Result<Vec<Self::PublicKey>, Self::Error>> + Send;
    }

    /// Interface for sending messages to a set of recipients.
    pub trait Sender: LimitedSender {
        /// Sends a message to a set of recipients.
        ///
        /// # Offline Recipients
        ///
        /// If a recipient is offline at the time a message is sent, the message
        /// will be dropped. It is up to the application to handle retries (if
        /// necessary).
        ///
        /// # Rate Limiting
        ///
        /// Recipients that exceed their rate limit will be skipped. The message is
        /// still sent to non-limited recipients. Check the returned vector to see
        /// which peers were sent the message.
        ///
        /// # Returns
        ///
        /// A vector of recipients that the message was sent to, or an error if the
        /// message could not be sent due to a validation failure (e.g., too large).
        ///
        /// Note: a successful send does not guarantee that the recipient will
        /// receive the message.
        ///
        /// # Graceful Shutdown
        ///
        /// Implementations must handle internal channel closures gracefully during
        /// shutdown. If the underlying network is shutting down, this method should
        /// return `Ok` (possibly with an empty or partial recipient list) rather
        /// than an error. Errors should only be returned for validation failures
        /// that the caller can act upon.
        fn send(
            &mut self,
            recipients: Recipients<Self::PublicKey>,
            message: impl Into<IoBufs> + Send,
            priority: bool,
        ) -> impl Future<
            Output = Result<Vec<Self::PublicKey>, <Self::Checked<'_> as CheckedSender>::Error>,
        > + Send {
            async move {
                match self.check(recipients).await {
                    Ok(checked_sender) => checked_sender.send(message, priority).await,
                    Err(_) => Ok(Vec::new()),
                }
            }
        }
    }

    // Blanket implementation of `Sender` for all `LimitedSender`s.
    impl<S: LimitedSender> Sender for S {}

    /// Interface for receiving messages from arbitrary recipients.
    pub trait Receiver: Debug + Send + 'static {
        /// Error that can occur when receiving a message.
        type Error: Debug + StdError + Send + Sync;

        /// Public key type used to identify recipients.
        type PublicKey: PublicKey;

        /// Receive a message from an arbitrary recipient.
        fn recv(
            &mut self,
        ) -> impl Future<Output = Result<Message<Self::PublicKey>, Self::Error>> + Send;
    }

    /// Notification sent to subscribers when a peer set changes.
    #[derive(Clone, Debug)]
    pub struct PeerSetUpdate<P: PublicKey> {
        /// The index of the peer set that changed.
        pub index: u64,
        /// The primary and secondary peers in the new set.
        pub latest: TrackedPeers<P>,
        /// Union of primary and secondary peers across all tracked peer sets.
        pub all: TrackedPeers<P>,
    }

    /// Alias for the subscription type returned by [`Provider::subscribe`].
    pub type PeerSetSubscription<P> = mpsc::UnboundedReceiver<PeerSetUpdate<P>>;

    /// Primary and secondary peers provided together to [`Manager::track`].
    ///
    /// The same public key may appear in both `primary` and `secondary`. [`Manager::track`]
    /// deduplicates overlapping keys, storing them as primary only.
    #[derive(Clone, Debug, PartialEq, Eq)]
    pub struct TrackedPeers<P: PublicKey> {
        /// Peers eligible for primary-only policies.
        pub primary: Set<P>,
        /// Peers eligible for secondary-only policies.
        pub secondary: Set<P>,
    }

    impl<P: PublicKey> TrackedPeers<P> {
        pub const fn new(primary: Set<P>, secondary: Set<P>) -> Self {
            Self { primary, secondary }
        }

        pub fn primary(primary: Set<P>) -> Self {
            Self::new(primary, Set::default())
        }

        /// Returns the deduplicated union of primary and secondary peers.
        pub fn union(self) -> Set<P> {
            Set::from_iter_dedup(self.primary.into_iter().chain(self.secondary))
        }
    }

    impl<P: PublicKey> From<Set<P>> for TrackedPeers<P> {
        fn from(primary: Set<P>) -> Self {
            Self::primary(primary)
        }
    }

    impl<P: PublicKey> Default for TrackedPeers<P> {
        fn default() -> Self {
            Self::new(Set::default(), Set::default())
        }
    }

    /// Primary and secondary peers provided together to [`AddressableManager::track`].
    ///
    /// The same public key may appear in both maps. [`AddressableManager::track`]
    /// deduplicates overlapping keys, storing them as primary only.
    #[derive(Clone, Debug)]
    pub struct AddressableTrackedPeers<P: PublicKey> {
        /// Addresses for peers eligible for primary-only policies.
        pub primary: Map<P, Address>,
        /// Addresses for peers eligible for secondary-only policies.
        pub secondary: Map<P, Address>,
    }

    impl<P: PublicKey> AddressableTrackedPeers<P> {
        pub const fn new(primary: Map<P, Address>, secondary: Map<P, Address>) -> Self {
            Self { primary, secondary }
        }

        pub fn primary(primary: Map<P, Address>) -> Self {
            Self::new(primary, Map::default())
        }
    }

    impl<P: PublicKey> From<Map<P, Address>> for AddressableTrackedPeers<P> {
        fn from(primary: Map<P, Address>) -> Self {
            Self::primary(primary)
        }
    }

    /// Interface for reading peer set information.
    pub trait Provider: Debug + Clone + Send + 'static {
        /// Public key type used to identify peers.
        type PublicKey: PublicKey;

        /// Fetch the primary and secondary peers tracked at the given ID.
        fn peer_set(
            &mut self,
            id: u64,
        ) -> impl Future<Output = Option<TrackedPeers<Self::PublicKey>>> + Send;

        /// Subscribe to notifications when new peer sets are added.
        ///
        /// Returns a receiver of [`PeerSetUpdate`] notifications. Each update's
        /// `latest` reflects how [`Manager::track`] stored the set: a peer listed in
        /// both roles appears only under `latest.primary`. The `all` field aggregates
        /// across tracked sets with the same rule (secondary excludes keys present as primary).
        fn subscribe(
            &mut self,
        ) -> impl Future<Output = PeerSetSubscription<Self::PublicKey>> + Send;
    }

    /// Interface for managing peer set membership (where peer addresses are not known).
    pub trait Manager: Provider {
        /// Track a primary and secondary peer set with the given ID.
        ///
        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
        /// of the consensus engine. It must be monotonically increasing as new peer sets are
        /// tracked.
        ///
        /// For good connectivity, all peers must track the same peer sets at the same ID.
        ///
        /// Callers may pass either a list of primary peers or a [`TrackedPeers`] value containing both primary and secondary peers.
        ///
        /// Overlapping keys in [`TrackedPeers`] are allowed; they are deduplicated as primary only.
        ///
        /// ## Active Peers
        ///
        /// The most recently registered peer set (highest ID) is considered the
        /// active set. Implementations use the active set to decide which peers to
        /// maintain connections with and which to disconnect from.
        ///
        /// ## Primary vs Secondary Peers
        ///
        /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
        /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
        /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
        /// often both gossiping data to them and answering requests from them).
        fn track<R>(
            &mut self,
            id: u64,
            peers: R,
        ) -> impl Future<Output = ()> + Send
        where
            R: Into<TrackedPeers<Self::PublicKey>> + Send;
    }

    /// Interface for managing peer set membership (where peer addresses are known).
    pub trait AddressableManager: Provider {
        /// Track a primary peer set and secondary peers with the given ID.
        ///
        /// The peer set ID passed to this function should be strictly managed, ideally matching the epoch
        /// of the consensus engine. It must be monotonically increasing as new peer sets are
        /// tracked.
        ///
        /// For good connectivity, all peers must track the same peer sets at the same ID.
        ///
        /// Callers may pass either a list of primary peers or a [`AddressableTrackedPeers`] value containing
        /// both primary and secondary peers.
        ///
        /// The same key may appear in both maps; see [`AddressableTrackedPeers`].
        ///
        /// ## Active Peers
        ///
        /// The most recently registered peer set (highest ID) is considered the
        /// active set. Implementations use the active set to decide which peers to
        /// maintain connections with and which to disconnect from.
        ///
        /// ## Primary vs Secondary Peers
        ///
        /// In p2p networks, there are often two tiers of peers: ones that help "drive progress" and ones that want to
        /// "follow that progress" (but not contribute to it). We call the former "primary" and the latter "secondary".
        /// When both are tracked, mechanisms favor "primary" peers but continue to replicate data to "secondary" peers (
        /// often both gossiping data to them and answering requests from them).
        fn track<R>(
            &mut self,
            id: u64,
            peers: R,
        ) -> impl Future<Output = ()> + Send
        where
            R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send;

        /// Update addresses for multiple peers without creating a new peer set.
        ///
        /// For each primary or secondary peer with a changed address:
        /// - Any existing connection to the peer is severed (it was on the old IP)
        /// - The listener's allowed IPs are updated to reflect the new egress IP
        /// - Future connections will use the new address
        fn overwrite(
            &mut self,
            peers: Map<Self::PublicKey, Address>,
        ) -> impl Future<Output = ()> + Send;
    }

    /// Interface for blocking other peers.
    pub trait Blocker: Clone + Send + 'static {
        /// Public key type used to identify peers.
        type PublicKey: PublicKey;

        /// Block a peer, disconnecting them if currently connected and preventing future connections.
        fn block(&mut self, peer: Self::PublicKey) -> impl Future<Output = ()> + Send;
    }
});

/// Logs a warning and blocks a peer in a single call.
///
/// This macro combines a [`tracing::warn!`] with a [`Blocker::block`] call
/// to ensure consistent logging at every block site. The peer is always
/// included as a `peer` field in the log output.
///
/// # Examples
///
/// ```ignore
/// block!(self.blocker, sender, "invalid message");
/// block!(self.blocker, sender, ?err, "invalid ack signature");
/// block!(self.blocker, sender, %view, "blocking peer for epoch mismatch");
/// ```
#[cfg(not(any(
    commonware_stability_GAMMA,
    commonware_stability_DELTA,
    commonware_stability_EPSILON,
    commonware_stability_RESERVED
)))] // BETA
#[macro_export]
macro_rules! block {
    ($blocker:expr, $peer:expr, $($arg:tt)+) => {
        let peer = $peer;
        tracing::warn!(peer = ?peer, $($arg)+);
        #[allow(clippy::disallowed_methods)]
        $blocker.block(peer).await;
    };
}

/// Block a peer without logging.
#[allow(
    clippy::disallowed_methods,
    reason = "test helper that bypasses the block! macro"
)]
#[cfg(test)]
pub async fn block_peer<B: Blocker>(blocker: &mut B, peer: B::PublicKey) {
    blocker.block(peer).await;
}