commonware_p2p/authenticated/
mod.rs

1//! Communicate with a fixed set of authenticated peers over encrypted connections.
2//!
3//! `authenticated` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//! - Optional Message Compression (using `zstd`)
14//!
15//! # Design
16//!
17//! ## Discovery
18//!
19//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
20//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
21//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
22//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
23//! dialable addresses [`SocketAddr`](std::net::SocketAddr) for the peers in that set.
24//! The `BitVec` message contains:
25//! - `index`: The `u64` index the bit vector applies to.
26//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
27//!   peer's address in the sorted list for that index.
28//!
29//! _Warning: If peers are not synchronized on the peer set composition at a given index,
30//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
31//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
32//! application layer is responsible for ensuring peer set synchronization._
33//!
34//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
35//! `gossip_bit_vec_frequency` in the [`Config`]) between connected peers. This serves as both a
36//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
37//! connection, especially during periods of low application-level traffic. The protocol supports
38//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
39//! `index`. This is useful, for instance, during transitions like distributed key generation
40//! (DKG) where connections to both old and new peer sets are needed simultaneously.
41//!
42//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
43//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
44//! selects a random subset of these known `PeerInfo` structures (up to `peer_gossip_max_count`)
45//! and sends them back in a `Payload::Peers` message. Each `PeerInfo` structure verifies a peer's
46//! address claim and contains:
47//! - `socket`: The [`SocketAddr`](std::net::SocketAddr) of the peer.
48//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
49//! - `public_key`: The peer's public key.
50//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
51//!
52//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
53//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
54//!
55//! If a peer receives a `PeerInfo` message (either directly or through gossip) containing a more
56//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
57//! `PeerInfo` is also used in future gossip messages. Each peer generates its own signed
58//! `PeerInfo` upon startup and sends it immediately after establishing a connection (following
59//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
60//! record, it will be corrected promptly by the peer being dialed.
61//!
62//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
63//! [`Config`]) - known peer public keys and their corresponding socket addresses. The peer
64//! attempts to dial these bootstrappers, performs the handshake, sends its own `PeerInfo`, and
65//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
66//! marked as '1'). It then waits for responses, learning about other peers through the
67//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
68//! them are maintained even if they aren't part of any currently tracked peer sets. Different
69//! peers can have different bootstrapper lists.
70//!
71//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
72//! belongs to a registered peer set, the listener will accept the connection, even if the
73//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
74//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
75//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
76//! with more up-to-date peer set information to connect and propagate that information, enabling
77//! the listener to potentially learn about newer sets it is part of._
78//!
79//! ## Messages
80//!
81//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
82//! internal `Data` structure. This structure contains:
83//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
84//! - `message`: The arbitrary application payload as `Bytes`.
85//!
86//! The size of the `message` bytes (after potential compression) must not exceed the configured
87//! `max_message_size`. If it does, the sending operation will fail with
88//! [`Error::MessageTooLarge`]. Messages can be sent with `priority`, allowing certain
89//! communications to potentially bypass lower-priority messages waiting in send queues across all
90//! channels. Each registered channel ([`Sender`], [`Receiver`]) handles its own message queuing,
91//! rate limiting, and optional `zstd` compression/decompression.
92//!
93//! # Example
94//!
95//! ```rust
96//! use commonware_p2p::{authenticated::{self, Network}, Sender, Recipients};
97//! use commonware_cryptography::{Ed25519, Signer, Verifier};
98//! use commonware_runtime::{tokio, Spawner, Runner, Metrics};
99//! use governor::Quota;
100//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
101//! use std::num::NonZeroU32;
102//!
103//! // Configure context
104//! let runtime_cfg = tokio::Config::default();
105//! let runner = tokio::Runner::new(runtime_cfg.clone());
106//!
107//! // Generate identity
108//! //
109//! // In production, the signer should be generated from a secure source of entropy.
110//! let signer = Ed25519::from_seed(0);
111//!
112//! // Generate peers
113//! //
114//! // In production, peer identities will be provided by some external source of truth
115//! // (like the staking set of a blockchain).
116//! let peer1 = Ed25519::from_seed(1).public_key();
117//! let peer2 = Ed25519::from_seed(2).public_key();
118//! let peer3 = Ed25519::from_seed(3).public_key();
119//!
120//! // Configure bootstrappers
121//! //
122//! // In production, it is likely that the address of bootstrappers will be some public address.
123//! let bootstrappers = vec![(peer1.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001))];
124//!
125//! // Configure namespace
126//! //
127//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
128//! let application_namespace = b"my-app-namespace";
129//!
130//! // Configure network
131//! //
132//! // In production, use a more conservative configuration like `Config::recommended`.
133//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
134//! let p2p_cfg = authenticated::Config::aggressive(
135//!     signer.clone(),
136//!     application_namespace,
137//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
138//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
139//!     bootstrappers,
140//!     MAX_MESSAGE_SIZE,
141//! );
142//!
143//! // Start context
144//! runner.start(|context| async move {
145//!     // Initialize network
146//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
147//!
148//!     // Register authorized peers
149//!     //
150//!     // In production, this would be updated as new peer sets are created (like when
151//!     // the composition of a validator set changes).
152//!     oracle.register(0, vec![signer.public_key(), peer1, peer2, peer3]).await;
153//!
154//!     // Register some channel
155//!     const MAX_MESSAGE_BACKLOG: usize = 128;
156//!     const COMPRESSION_LEVEL: Option<i32> = Some(3);
157//!     let (mut sender, receiver) = network.register(
158//!         0,
159//!         Quota::per_second(NonZeroU32::new(1).unwrap()),
160//!         MAX_MESSAGE_BACKLOG,
161//!         COMPRESSION_LEVEL,
162//!     );
163//!
164//!     // Run network
165//!     let network_handler = network.start();
166//!
167//!     // Example: Use sender
168//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
169//!
170//!     // Shutdown network
171//!     network_handler.abort();
172//! });
173//! ```
174
175mod actors;
176mod channels;
177mod config;
178mod ip;
179mod metrics;
180mod network;
181mod types;
182
183use thiserror::Error;
184
185/// Errors that can occur when interacting with the network.
186#[derive(Error, Debug)]
187pub enum Error {
188    #[error("message too large: {0}")]
189    MessageTooLarge(usize),
190    #[error("compression failed")]
191    CompressionFailed,
192    #[error("decompression failed")]
193    DecompressionFailed,
194    #[error("network closed")]
195    NetworkClosed,
196}
197
198pub use actors::tracker::Oracle;
199pub use channels::{Receiver, Sender};
200pub use config::{Bootstrapper, Config};
201pub use network::Network;
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::{Receiver, Recipients, Sender};
207    use commonware_cryptography::{Ed25519, Signer};
208    use commonware_macros::test_traced;
209    use commonware_runtime::{
210        deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
211    };
212    use governor::{clock::ReasonablyRealtime, Quota};
213    use rand::{CryptoRng, Rng};
214    use std::collections::HashSet;
215    use std::{
216        net::{IpAddr, Ipv4Addr, SocketAddr},
217        num::NonZeroU32,
218        time::Duration,
219    };
220
221    #[derive(Copy, Clone)]
222    enum Mode {
223        All,
224        Some,
225        One,
226    }
227
228    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
229
230    /// Test connectivity between `n` peers.
231    ///
232    /// We set a unique `base_port` for each test to avoid "address already in use"
233    /// errors when tests are run immediately after each other.
234    async fn run_network(
235        context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
236        max_message_size: usize,
237        base_port: u16,
238        n: usize,
239        mode: Mode,
240    ) {
241        // Create peers
242        let mut peers = Vec::new();
243        for i in 0..n {
244            peers.push(Ed25519::from_seed(i as u64));
245        }
246        let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
247
248        // Create networks
249        let mut waiters = Vec::new();
250        for (i, peer) in peers.iter().enumerate() {
251            // Create peer context
252            let context = context.with_label(&format!("peer-{}", i));
253
254            // Derive port
255            let port = base_port + i as u16;
256
257            // Create bootstrappers
258            let mut bootstrappers = Vec::new();
259            if i > 0 {
260                bootstrappers.push((
261                    addresses[0].clone(),
262                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
263                ));
264            }
265
266            // Create network
267            let signer = peer.clone();
268            let config = Config::test(
269                signer.clone(),
270                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
271                bootstrappers,
272                max_message_size,
273            );
274            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
275
276            // Register peers
277            oracle.register(0, addresses.clone()).await;
278
279            // Register basic application
280            let (mut sender, mut receiver) = network.register(
281                0,
282                Quota::per_second(NonZeroU32::new(5).unwrap()), // Ensure we hit the rate limit
283                DEFAULT_MESSAGE_BACKLOG,
284                None,
285            );
286
287            // Wait to connect to all peers, and then send messages to everyone
288            network.start();
289
290            // Send/Receive messages
291            let handler = context.with_label("agent").spawn({
292                let addresses = addresses.clone();
293                move |context| async move {
294                    // Wait for all peers to send their identity
295                    let acker = context
296                        .clone()
297                        .with_label("receiver")
298                        .spawn(move |_| async move {
299                            let mut received = HashSet::new();
300                            while received.len() < n - 1 {
301                                // Ensure message equals sender identity
302                                let (sender, message) = receiver.recv().await.unwrap();
303                                assert_eq!(sender.as_ref(), message.as_ref());
304
305                                // Add to received set
306                                received.insert(sender);
307                            }
308                        });
309
310                    // Send identity to all peers
311                    let msg = signer.public_key();
312                    match mode {
313                        Mode::One => {
314                            for (j, recipient) in addresses.iter().enumerate() {
315                                // Don't send message to self
316                                if i == j {
317                                    continue;
318                                }
319
320                                // Loop until success
321                                loop {
322                                    let sent = sender
323                                        .send(
324                                            Recipients::One(recipient.clone()),
325                                            msg.to_vec().into(),
326                                            true,
327                                        )
328                                        .await
329                                        .unwrap();
330                                    if sent.len() != 1 {
331                                        context.sleep(Duration::from_millis(100)).await;
332                                        continue;
333                                    }
334                                    assert_eq!(&sent[0], recipient);
335                                    break;
336                                }
337                            }
338                        }
339                        Mode::Some => {
340                            // Get all peers not including self
341                            let mut recipients = addresses.clone();
342                            recipients.remove(i);
343                            recipients.sort();
344
345                            // Loop until all peer sends successful
346                            loop {
347                                let mut sent = sender
348                                    .send(
349                                        Recipients::Some(recipients.clone()),
350                                        msg.to_vec().into(),
351                                        true,
352                                    )
353                                    .await
354                                    .unwrap();
355                                if sent.len() != n - 1 {
356                                    context.sleep(Duration::from_millis(100)).await;
357                                    continue;
358                                }
359
360                                // Compare to expected
361                                sent.sort();
362                                assert_eq!(sent, recipients);
363                                break;
364                            }
365                        }
366                        Mode::All => {
367                            // Get all peers not including self
368                            let mut recipients = addresses.clone();
369                            recipients.remove(i);
370                            recipients.sort();
371
372                            // Loop until all peer sends successful
373                            loop {
374                                let mut sent = sender
375                                    .send(Recipients::All, msg.to_vec().into(), true)
376                                    .await
377                                    .unwrap();
378                                if sent.len() != n - 1 {
379                                    context.sleep(Duration::from_millis(100)).await;
380                                    continue;
381                                }
382
383                                // Compare to expected
384                                sent.sort();
385                                assert_eq!(sent, recipients);
386                                break;
387                            }
388                        }
389                    };
390
391                    // Wait for all peers to send their identity
392                    acker.await.unwrap();
393                }
394            });
395
396            // Add to waiters
397            waiters.push(handler);
398        }
399
400        // Wait for all peers to finish
401        for waiter in waiters {
402            waiter.await.unwrap();
403        }
404    }
405
406    fn run_deterministic_test(seed: u64, mode: Mode) {
407        // Configure test
408        const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
409        const NUM_PEERS: usize = 25;
410        const BASE_PORT: u16 = 3000;
411
412        // Run first instance
413        let executor = deterministic::Runner::seeded(seed);
414        let state = executor.start(|context| async move {
415            run_network(
416                context.clone(),
417                MAX_MESSAGE_SIZE,
418                BASE_PORT,
419                NUM_PEERS,
420                mode,
421            )
422            .await;
423            context.auditor().state()
424        });
425
426        // Compare result to second instance
427        let executor = deterministic::Runner::seeded(seed);
428        let state2 = executor.start(|context| async move {
429            run_network(
430                context.clone(),
431                MAX_MESSAGE_SIZE,
432                BASE_PORT,
433                NUM_PEERS,
434                mode,
435            )
436            .await;
437            context.auditor().state()
438        });
439        assert_eq!(state, state2);
440    }
441
442    #[test_traced]
443    fn test_determinism_one() {
444        for i in 0..10 {
445            run_deterministic_test(i, Mode::One);
446        }
447    }
448
449    #[test_traced]
450    fn test_determinism_some() {
451        for i in 0..10 {
452            run_deterministic_test(i, Mode::Some);
453        }
454    }
455
456    #[test_traced]
457    fn test_determinism_all() {
458        for i in 0..10 {
459            run_deterministic_test(i, Mode::All);
460        }
461    }
462
463    #[test_traced]
464    fn test_tokio_connectivity() {
465        let cfg = tokio::Config::default();
466        let executor = tokio::Runner::new(cfg.clone());
467        executor.start(|context| async move {
468            const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
469            let base_port = 3000;
470            let n = 10;
471            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
472        });
473    }
474
475    #[test_traced]
476    fn test_multi_index_oracle() {
477        // Configure test
478        let base_port = 3000;
479        let n: usize = 100;
480
481        // Initialize context
482        let executor = deterministic::Runner::default();
483        executor.start(|context| async move {
484            // Create peers
485            let mut peers = Vec::new();
486            for i in 0..n {
487                peers.push(Ed25519::from_seed(i as u64));
488            }
489            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
490
491            // Create networks
492            let mut waiters = Vec::new();
493            for (i, peer) in peers.iter().enumerate() {
494                // Create peer context
495                let context = context.with_label(&format!("peer-{}", i));
496
497                // Derive port
498                let port = base_port + i as u16;
499
500                // Create bootstrappers
501                let mut bootstrappers = Vec::new();
502                if i > 0 {
503                    bootstrappers.push((
504                        addresses[0].clone(),
505                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
506                    ));
507                }
508
509                // Create network
510                let signer = peer.clone();
511                let config = Config::test(
512                    signer.clone(),
513                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
514                    bootstrappers,
515                    1_024 * 1_024, // 1MB
516                );
517                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
518
519                // Register peers at separate indices
520                oracle.register(0, vec![addresses[0].clone()]).await;
521                oracle
522                    .register(1, vec![addresses[1].clone(), addresses[2].clone()])
523                    .await;
524                oracle
525                    .register(2, addresses.iter().skip(2).cloned().collect())
526                    .await;
527
528                // Register basic application
529                let (mut sender, mut receiver) = network.register(
530                    0,
531                    Quota::per_second(NonZeroU32::new(10).unwrap()),
532                    DEFAULT_MESSAGE_BACKLOG,
533                    None,
534                );
535
536                // Wait to connect to all peers, and then send messages to everyone
537                network.start();
538
539                // Send/Receive messages
540                let handler = context
541                    .with_label("agent")
542                    .spawn(move |context| async move {
543                        if i == 0 {
544                            // Loop until success
545                            let msg = signer.public_key();
546                            loop {
547                                if sender
548                                    .send(Recipients::All, msg.to_vec().into(), true)
549                                    .await
550                                    .unwrap()
551                                    .len()
552                                    == n - 1
553                                {
554                                    break;
555                                }
556
557                                // Sleep and try again (avoid busy loop)
558                                context.sleep(Duration::from_millis(100)).await;
559                            }
560                        } else {
561                            // Ensure message equals sender identity
562                            let (sender, message) = receiver.recv().await.unwrap();
563                            assert_eq!(sender.as_ref(), message.as_ref());
564                        }
565                    });
566
567                // Add to waiters
568                waiters.push(handler);
569            }
570
571            // Wait for waiters to finish (receiver before sender)
572            for waiter in waiters.into_iter().rev() {
573                waiter.await.unwrap();
574            }
575        });
576    }
577
578    fn test_message_too_large(compression: Option<i32>) {
579        // Configure test
580        let base_port = 3000;
581        let n: usize = 2;
582
583        // Initialize context
584        let executor = deterministic::Runner::seeded(0);
585        executor.start(|mut context| async move {
586            // Create peers
587            let mut peers = Vec::new();
588            for i in 0..n {
589                peers.push(Ed25519::from_seed(i as u64));
590            }
591            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
592
593            // Create network
594            let signer = peers[0].clone();
595            let config = Config::test(
596                signer.clone(),
597                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
598                Vec::new(),
599                1_024 * 1_024, // 1MB
600            );
601            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
602
603            // Register peers
604            oracle.register(0, addresses.clone()).await;
605
606            // Register basic application
607            let (mut sender, _) = network.register(
608                0,
609                Quota::per_second(NonZeroU32::new(10).unwrap()),
610                DEFAULT_MESSAGE_BACKLOG,
611                compression,
612            );
613
614            // Wait to connect to all peers, and then send messages to everyone
615            network.start();
616
617            // Crate random message
618            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
619            context.fill(&mut msg[..]);
620
621            // Send message
622            let recipient = Recipients::One(addresses[1].clone());
623            let result = sender.send(recipient, msg.into(), true).await;
624            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
625        });
626    }
627
628    #[test_traced]
629    fn test_message_too_large_no_compression() {
630        test_message_too_large(None);
631    }
632
633    #[test_traced]
634    fn test_message_too_large_compression() {
635        test_message_too_large(Some(3));
636    }
637}