commonware_p2p/authenticated/
mod.rs

1//! Communicate with a fixed set of authenticated peers over encrypted connections.
2//!
3//! `authenticated` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//! - Optional Message Compression (using `zstd`)
14//!
15//! # Design
16//!
17//! ## Discovery
18//!
19//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
20//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
21//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
22//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
23//! dialable addresses [`SocketAddr`](std::net::SocketAddr) for the peers in that set.
24//! The `BitVec` message contains:
25//! - `index`: The `u64` index the bit vector applies to.
26//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
27//!   peer's address in the sorted list for that index.
28//!
29//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
30//! peer a (configurable) number of times, they will assume that information is stale and mark it as
31//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
32//! information about other peers.
33//!
34//! _Warning: If peers are not synchronized on the peer set composition at a given index,
35//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
36//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
37//! application layer is responsible for ensuring peer set synchronization._
38//!
39//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
40//! `gossip_bit_vec_frequency` in the [`Config`]) between connected peers. This serves as both a
41//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
42//! connection, especially during periods of low application-level traffic. The protocol supports
43//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
44//! `index`. This is useful, for instance, during transitions like distributed key generation
45//! (DKG) where connections to both old and new peer sets are needed simultaneously.
46//!
47//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
48//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
49//! selects a random subset of these known `PeerInfo` structures (up to `peer_gossip_max_count`)
50//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
51//! `PeerInfo` for peers that they currently have a connection with. This prevents them from
52//! repeatedly sending `PeerInfo` that they cannot verify is still valid. Each `PeerInfo` contains:
53//! - `socket`: The [`SocketAddr`](std::net::SocketAddr) of the peer.
54//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
55//! - `public_key`: The peer's public key.
56//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
57//!
58//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
59//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
60//!
61//! If a peer receives a `PeerInfo` message (either directly or through gossip) containing a more
62//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
63//! `PeerInfo` is also used in future gossip messages. Each peer generates its own signed
64//! `PeerInfo` upon startup and sends it immediately after establishing a connection (following
65//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
66//! record, it will be corrected promptly by the peer being dialed.
67//!
68//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
69//! [`Config`]) - known peer public keys and their corresponding socket addresses. The peer
70//! attempts to dial these bootstrappers, performs the handshake, sends its own `PeerInfo`, and
71//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
72//! marked as '1'). It then waits for responses, learning about other peers through the
73//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
74//! them are maintained even if they aren't part of any currently tracked peer sets. Different
75//! peers can have different bootstrapper lists.
76//!
77//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
78//! belongs to a registered peer set, the listener will accept the connection, even if the
79//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
80//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
81//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
82//! with more up-to-date peer set information to connect and propagate that information, enabling
83//! the listener to potentially learn about newer sets it is part of._
84//!
85//! ## Messages
86//!
87//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
88//! internal `Data` structure. This structure contains:
89//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
90//! - `message`: The arbitrary application payload as `Bytes`.
91//!
92//! The size of the `message` bytes (after potential compression) must not exceed the configured
93//! `max_message_size`. If it does, the sending operation will fail with
94//! [`Error::MessageTooLarge`]. Messages can be sent with `priority`, allowing certain
95//! communications to potentially bypass lower-priority messages waiting in send queues across all
96//! channels. Each registered channel ([`Sender`], [`Receiver`]) handles its own message queuing,
97//! rate limiting, and optional `zstd` compression/decompression.
98//!
99//! # Example
100//!
101//! ```rust
102//! use commonware_p2p::{authenticated::{self, Network}, Sender, Recipients};
103//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
104//! use commonware_runtime::{tokio, Spawner, Runner, Metrics};
105//! use commonware_utils::NZU32;
106//! use governor::Quota;
107//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
108//!
109//! // Configure context
110//! let runtime_cfg = tokio::Config::default();
111//! let runner = tokio::Runner::new(runtime_cfg.clone());
112//!
113//! // Generate identity
114//! //
115//! // In production, the signer should be generated from a secure source of entropy.
116//! let signer = ed25519::PrivateKey::from_seed(0);
117//!
118//! // Generate peers
119//! //
120//! // In production, peer identities will be provided by some external source of truth
121//! // (like the staking set of a blockchain).
122//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
123//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
124//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
125//!
126//! // Configure bootstrappers
127//! //
128//! // In production, it is likely that the address of bootstrappers will be some public address.
129//! let bootstrappers = vec![(peer1.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001))];
130//!
131//! // Configure namespace
132//! //
133//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
134//! let application_namespace = b"my-app-namespace";
135//!
136//! // Configure network
137//! //
138//! // In production, use a more conservative configuration like `Config::recommended`.
139//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
140//! let p2p_cfg = authenticated::Config::aggressive(
141//!     signer.clone(),
142//!     application_namespace,
143//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
144//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
145//!     bootstrappers,
146//!     MAX_MESSAGE_SIZE,
147//! );
148//!
149//! // Start context
150//! runner.start(|context| async move {
151//!     // Initialize network
152//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
153//!
154//!     // Register authorized peers
155//!     //
156//!     // In production, this would be updated as new peer sets are created (like when
157//!     // the composition of a validator set changes).
158//!     oracle.register(0, vec![signer.public_key(), peer1, peer2, peer3]).await;
159//!
160//!     // Register some channel
161//!     const MAX_MESSAGE_BACKLOG: usize = 128;
162//!     const COMPRESSION_LEVEL: Option<i32> = Some(3);
163//!     let (mut sender, receiver) = network.register(
164//!         0,
165//!         Quota::per_second(NZU32!(1)),
166//!         MAX_MESSAGE_BACKLOG,
167//!         COMPRESSION_LEVEL,
168//!     );
169//!
170//!     // Run network
171//!     let network_handler = network.start();
172//!
173//!     // Example: Use sender
174//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
175//!
176//!     // Shutdown network
177//!     network_handler.abort();
178//! });
179//! ```
180
181mod actors;
182mod channels;
183mod config;
184mod ip;
185mod metrics;
186mod network;
187mod types;
188
189use thiserror::Error;
190
191/// Errors that can occur when interacting with the network.
192#[derive(Error, Debug)]
193pub enum Error {
194    #[error("message too large: {0}")]
195    MessageTooLarge(usize),
196    #[error("compression failed")]
197    CompressionFailed,
198    #[error("decompression failed")]
199    DecompressionFailed,
200    #[error("network closed")]
201    NetworkClosed,
202}
203
204pub use actors::tracker::Oracle;
205pub use channels::{Receiver, Sender};
206pub use config::{Bootstrapper, Config};
207pub use network::Network;
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::{Receiver, Recipients, Sender};
213    use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
214    use commonware_macros::test_traced;
215    use commonware_runtime::{
216        deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
217    };
218    use commonware_utils::NZU32;
219    use governor::{clock::ReasonablyRealtime, Quota};
220    use rand::{CryptoRng, Rng};
221    use std::collections::HashSet;
222    use std::{
223        net::{IpAddr, Ipv4Addr, SocketAddr},
224        time::Duration,
225    };
226
227    #[derive(Copy, Clone)]
228    enum Mode {
229        All,
230        Some,
231        One,
232    }
233
234    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
235
236    /// Test connectivity between `n` peers.
237    ///
238    /// We set a unique `base_port` for each test to avoid "address already in use"
239    /// errors when tests are run immediately after each other.
240    async fn run_network(
241        context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
242        max_message_size: usize,
243        base_port: u16,
244        n: usize,
245        mode: Mode,
246    ) {
247        // Create peers
248        let mut peers = Vec::new();
249        for i in 0..n {
250            peers.push(ed25519::PrivateKey::from_seed(i as u64));
251        }
252        let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
253
254        // Create networks
255        let mut waiters = Vec::new();
256        for (i, peer) in peers.iter().enumerate() {
257            // Create peer context
258            let context = context.with_label(&format!("peer-{}", i));
259
260            // Derive port
261            let port = base_port + i as u16;
262
263            // Create bootstrappers
264            let mut bootstrappers = Vec::new();
265            if i > 0 {
266                bootstrappers.push((
267                    addresses[0].clone(),
268                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
269                ));
270            }
271
272            // Create network
273            let signer = peer.clone();
274            let config = Config::test(
275                signer.clone(),
276                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
277                bootstrappers,
278                max_message_size,
279            );
280            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
281
282            // Register peers
283            oracle.register(0, addresses.clone()).await;
284
285            // Register basic application
286            let (mut sender, mut receiver) = network.register(
287                0,
288                Quota::per_second(NZU32!(5)), // Ensure we hit the rate limit
289                DEFAULT_MESSAGE_BACKLOG,
290                None,
291            );
292
293            // Wait to connect to all peers, and then send messages to everyone
294            network.start();
295
296            // Send/Receive messages
297            let handler = context.with_label("agent").spawn({
298                let addresses = addresses.clone();
299                move |context| async move {
300                    // Wait for all peers to send their identity
301                    let acker = context.with_label("receiver").spawn(move |_| async move {
302                        let mut received = HashSet::new();
303                        while received.len() < n - 1 {
304                            // Ensure message equals sender identity
305                            let (sender, message) = receiver.recv().await.unwrap();
306                            assert_eq!(sender.as_ref(), message.as_ref());
307
308                            // Add to received set
309                            received.insert(sender);
310                        }
311                    });
312
313                    // Send identity to all peers
314                    let msg = signer.public_key();
315                    match mode {
316                        Mode::One => {
317                            for (j, recipient) in addresses.iter().enumerate() {
318                                // Don't send message to self
319                                if i == j {
320                                    continue;
321                                }
322
323                                // Loop until success
324                                loop {
325                                    let sent = sender
326                                        .send(
327                                            Recipients::One(recipient.clone()),
328                                            msg.to_vec().into(),
329                                            true,
330                                        )
331                                        .await
332                                        .unwrap();
333                                    if sent.len() != 1 {
334                                        context.sleep(Duration::from_millis(100)).await;
335                                        continue;
336                                    }
337                                    assert_eq!(&sent[0], recipient);
338                                    break;
339                                }
340                            }
341                        }
342                        Mode::Some => {
343                            // Get all peers not including self
344                            let mut recipients = addresses.clone();
345                            recipients.remove(i);
346                            recipients.sort();
347
348                            // Loop until all peer sends successful
349                            loop {
350                                let mut sent = sender
351                                    .send(
352                                        Recipients::Some(recipients.clone()),
353                                        msg.to_vec().into(),
354                                        true,
355                                    )
356                                    .await
357                                    .unwrap();
358                                if sent.len() != n - 1 {
359                                    context.sleep(Duration::from_millis(100)).await;
360                                    continue;
361                                }
362
363                                // Compare to expected
364                                sent.sort();
365                                assert_eq!(sent, recipients);
366                                break;
367                            }
368                        }
369                        Mode::All => {
370                            // Get all peers not including self
371                            let mut recipients = addresses.clone();
372                            recipients.remove(i);
373                            recipients.sort();
374
375                            // Loop until all peer sends successful
376                            loop {
377                                let mut sent = sender
378                                    .send(Recipients::All, msg.to_vec().into(), true)
379                                    .await
380                                    .unwrap();
381                                if sent.len() != n - 1 {
382                                    context.sleep(Duration::from_millis(100)).await;
383                                    continue;
384                                }
385
386                                // Compare to expected
387                                sent.sort();
388                                assert_eq!(sent, recipients);
389                                break;
390                            }
391                        }
392                    };
393
394                    // Wait for all peers to send their identity
395                    acker.await.unwrap();
396                }
397            });
398
399            // Add to waiters
400            waiters.push(handler);
401        }
402
403        // Wait for all peers to finish
404        for waiter in waiters {
405            waiter.await.unwrap();
406        }
407    }
408
409    fn run_deterministic_test(seed: u64, mode: Mode) {
410        // Configure test
411        const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
412        const NUM_PEERS: usize = 25;
413        const BASE_PORT: u16 = 3000;
414
415        // Run first instance
416        let executor = deterministic::Runner::seeded(seed);
417        let state = executor.start(|context| async move {
418            run_network(
419                context.clone(),
420                MAX_MESSAGE_SIZE,
421                BASE_PORT,
422                NUM_PEERS,
423                mode,
424            )
425            .await;
426            context.auditor().state()
427        });
428
429        // Compare result to second instance
430        let executor = deterministic::Runner::seeded(seed);
431        let state2 = executor.start(|context| async move {
432            run_network(
433                context.clone(),
434                MAX_MESSAGE_SIZE,
435                BASE_PORT,
436                NUM_PEERS,
437                mode,
438            )
439            .await;
440            context.auditor().state()
441        });
442        assert_eq!(state, state2);
443    }
444
445    #[test_traced]
446    #[ignore]
447    fn test_determinism_one() {
448        for i in 0..10 {
449            run_deterministic_test(i, Mode::One);
450        }
451    }
452
453    #[test_traced]
454    #[ignore]
455    fn test_determinism_some() {
456        for i in 0..10 {
457            run_deterministic_test(i, Mode::Some);
458        }
459    }
460
461    #[test_traced]
462    #[ignore]
463    fn test_determinism_all() {
464        for i in 0..10 {
465            run_deterministic_test(i, Mode::All);
466        }
467    }
468
469    #[test_traced]
470    fn test_tokio_connectivity() {
471        let cfg = tokio::Config::default();
472        let executor = tokio::Runner::new(cfg.clone());
473        executor.start(|context| async move {
474            const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
475            let base_port = 3000;
476            let n = 10;
477            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
478        });
479    }
480
481    #[test_traced]
482    fn test_multi_index_oracle() {
483        // Configure test
484        let base_port = 3000;
485        let n: usize = 100;
486
487        // Initialize context
488        let executor = deterministic::Runner::default();
489        executor.start(|context| async move {
490            // Create peers
491            let mut peers = Vec::new();
492            for i in 0..n {
493                peers.push(ed25519::PrivateKey::from_seed(i as u64));
494            }
495            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
496
497            // Create networks
498            let mut waiters = Vec::new();
499            for (i, peer) in peers.iter().enumerate() {
500                // Create peer context
501                let context = context.with_label(&format!("peer-{}", i));
502
503                // Derive port
504                let port = base_port + i as u16;
505
506                // Create bootstrappers
507                let mut bootstrappers = Vec::new();
508                if i > 0 {
509                    bootstrappers.push((
510                        addresses[0].clone(),
511                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
512                    ));
513                }
514
515                // Create network
516                let signer = peer.clone();
517                let config = Config::test(
518                    signer.clone(),
519                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
520                    bootstrappers,
521                    1_024 * 1_024, // 1MB
522                );
523                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
524
525                // Register peers at separate indices
526                oracle.register(0, vec![addresses[0].clone()]).await;
527                oracle
528                    .register(1, vec![addresses[1].clone(), addresses[2].clone()])
529                    .await;
530                oracle
531                    .register(2, addresses.iter().skip(2).cloned().collect())
532                    .await;
533
534                // Register basic application
535                let (mut sender, mut receiver) = network.register(
536                    0,
537                    Quota::per_second(NZU32!(10)),
538                    DEFAULT_MESSAGE_BACKLOG,
539                    None,
540                );
541
542                // Wait to connect to all peers, and then send messages to everyone
543                network.start();
544
545                // Send/Receive messages
546                let handler = context
547                    .with_label("agent")
548                    .spawn(move |context| async move {
549                        if i == 0 {
550                            // Loop until success
551                            let msg = signer.public_key();
552                            loop {
553                                if sender
554                                    .send(Recipients::All, msg.to_vec().into(), true)
555                                    .await
556                                    .unwrap()
557                                    .len()
558                                    == n - 1
559                                {
560                                    break;
561                                }
562
563                                // Sleep and try again (avoid busy loop)
564                                context.sleep(Duration::from_millis(100)).await;
565                            }
566                        } else {
567                            // Ensure message equals sender identity
568                            let (sender, message) = receiver.recv().await.unwrap();
569                            assert_eq!(sender.as_ref(), message.as_ref());
570                        }
571                    });
572
573                // Add to waiters
574                waiters.push(handler);
575            }
576
577            // Wait for waiters to finish (receiver before sender)
578            for waiter in waiters.into_iter().rev() {
579                waiter.await.unwrap();
580            }
581        });
582    }
583
584    fn test_message_too_large(compression: Option<i32>) {
585        // Configure test
586        let base_port = 3000;
587        let n: usize = 2;
588
589        // Initialize context
590        let executor = deterministic::Runner::seeded(0);
591        executor.start(|mut context| async move {
592            // Create peers
593            let mut peers = Vec::new();
594            for i in 0..n {
595                peers.push(ed25519::PrivateKey::from_seed(i as u64));
596            }
597            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
598
599            // Create network
600            let signer = peers[0].clone();
601            let config = Config::test(
602                signer.clone(),
603                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
604                Vec::new(),
605                1_024 * 1_024, // 1MB
606            );
607            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
608
609            // Register peers
610            oracle.register(0, addresses.clone()).await;
611
612            // Register basic application
613            let (mut sender, _) = network.register(
614                0,
615                Quota::per_second(NZU32!(10)),
616                DEFAULT_MESSAGE_BACKLOG,
617                compression,
618            );
619
620            // Wait to connect to all peers, and then send messages to everyone
621            network.start();
622
623            // Crate random message
624            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
625            context.fill(&mut msg[..]);
626
627            // Send message
628            let recipient = Recipients::One(addresses[1].clone());
629            let result = sender.send(recipient, msg.into(), true).await;
630            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
631        });
632    }
633
634    #[test_traced]
635    fn test_message_too_large_no_compression() {
636        test_message_too_large(None);
637    }
638
639    #[test_traced]
640    fn test_message_too_large_compression() {
641        test_message_too_large(Some(3));
642    }
643}