commonware_p2p/authenticated/mod.rs
1//! Communicate with a fixed set of authenticated peers over encrypted connections.
2//!
3//! `authenticated` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//! - Optional Message Compression (using `zstd`)
14//!
15//! # Design
16//!
17//! ## Discovery
18//!
19//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
20//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
21//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
22//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
23//! dialable addresses [`SocketAddr`](std::net::SocketAddr) for the peers in that set.
24//! The `BitVec` message contains:
25//! - `index`: The `u64` index the bit vector applies to.
26//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
27//! peer's address in the sorted list for that index.
28//!
29//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
30//! peer a (configurable) number of times, they will assume that information is stale and mark it as
31//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
32//! information about other peers.
33//!
34//! _Warning: If peers are not synchronized on the peer set composition at a given index,
35//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
36//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
37//! application layer is responsible for ensuring peer set synchronization._
38//!
39//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
40//! `gossip_bit_vec_frequency` in the [`Config`]) between connected peers. This serves as both a
41//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
42//! connection, especially during periods of low application-level traffic. The protocol supports
43//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
44//! `index`. This is useful, for instance, during transitions like distributed key generation
45//! (DKG) where connections to both old and new peer sets are needed simultaneously.
46//!
47//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
48//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
49//! selects a random subset of these known `PeerInfo` structures (up to `peer_gossip_max_count`)
50//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
51//! `PeerInfo` for peers that they currently have a connection with. This prevents them from
52//! repeatedly sending `PeerInfo` that they cannot verify is still valid. Each `PeerInfo` contains:
53//! - `socket`: The [`SocketAddr`](std::net::SocketAddr) of the peer.
54//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
55//! - `public_key`: The peer's public key.
56//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
57//!
58//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
59//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
60//!
61//! If a peer receives a `PeerInfo` message (either directly or through gossip) containing a more
62//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
63//! `PeerInfo` is also used in future gossip messages. Each peer generates its own signed
64//! `PeerInfo` upon startup and sends it immediately after establishing a connection (following
65//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
66//! record, it will be corrected promptly by the peer being dialed.
67//!
68//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
69//! [`Config`]) - known peer public keys and their corresponding socket addresses. The peer
70//! attempts to dial these bootstrappers, performs the handshake, sends its own `PeerInfo`, and
71//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
72//! marked as '1'). It then waits for responses, learning about other peers through the
73//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
74//! them are maintained even if they aren't part of any currently tracked peer sets. Different
75//! peers can have different bootstrapper lists.
76//!
77//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
78//! belongs to a registered peer set, the listener will accept the connection, even if the
79//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
80//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
81//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
82//! with more up-to-date peer set information to connect and propagate that information, enabling
83//! the listener to potentially learn about newer sets it is part of._
84//!
85//! ## Messages
86//!
87//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
88//! internal `Data` structure. This structure contains:
89//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
90//! - `message`: The arbitrary application payload as `Bytes`.
91//!
92//! The size of the `message` bytes (after potential compression) must not exceed the configured
93//! `max_message_size`. If it does, the sending operation will fail with
94//! [`Error::MessageTooLarge`]. Messages can be sent with `priority`, allowing certain
95//! communications to potentially bypass lower-priority messages waiting in send queues across all
96//! channels. Each registered channel ([`Sender`], [`Receiver`]) handles its own message queuing,
97//! rate limiting, and optional `zstd` compression/decompression.
98//!
99//! # Example
100//!
101//! ```rust
102//! use commonware_p2p::{authenticated::{self, Network}, Sender, Recipients};
103//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
104//! use commonware_runtime::{tokio, Spawner, Runner, Metrics};
105//! use commonware_utils::NZU32;
106//! use governor::Quota;
107//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
108//!
109//! // Configure context
110//! let runtime_cfg = tokio::Config::default();
111//! let runner = tokio::Runner::new(runtime_cfg.clone());
112//!
113//! // Generate identity
114//! //
115//! // In production, the signer should be generated from a secure source of entropy.
116//! let signer = ed25519::PrivateKey::from_seed(0);
117//!
118//! // Generate peers
119//! //
120//! // In production, peer identities will be provided by some external source of truth
121//! // (like the staking set of a blockchain).
122//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
123//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
124//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
125//!
126//! // Configure bootstrappers
127//! //
128//! // In production, it is likely that the address of bootstrappers will be some public address.
129//! let bootstrappers = vec![(peer1.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001))];
130//!
131//! // Configure namespace
132//! //
133//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
134//! let application_namespace = b"my-app-namespace";
135//!
136//! // Configure network
137//! //
138//! // In production, use a more conservative configuration like `Config::recommended`.
139//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
140//! let p2p_cfg = authenticated::Config::aggressive(
141//! signer.clone(),
142//! application_namespace,
143//! SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
144//! SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
145//! bootstrappers,
146//! MAX_MESSAGE_SIZE,
147//! );
148//!
149//! // Start context
150//! runner.start(|context| async move {
151//! // Initialize network
152//! let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
153//!
154//! // Register authorized peers
155//! //
156//! // In production, this would be updated as new peer sets are created (like when
157//! // the composition of a validator set changes).
158//! oracle.register(0, vec![signer.public_key(), peer1, peer2, peer3]).await;
159//!
160//! // Register some channel
161//! const MAX_MESSAGE_BACKLOG: usize = 128;
162//! const COMPRESSION_LEVEL: Option<i32> = Some(3);
163//! let (mut sender, receiver) = network.register(
164//! 0,
165//! Quota::per_second(NZU32!(1)),
166//! MAX_MESSAGE_BACKLOG,
167//! COMPRESSION_LEVEL,
168//! );
169//!
170//! // Run network
171//! let network_handler = network.start();
172//!
173//! // Example: Use sender
174//! let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
175//!
176//! // Shutdown network
177//! network_handler.abort();
178//! });
179//! ```
180
181mod actors;
182mod channels;
183mod config;
184mod ip;
185mod metrics;
186mod network;
187mod types;
188
189use thiserror::Error;
190
191/// Errors that can occur when interacting with the network.
192#[derive(Error, Debug)]
193pub enum Error {
194 #[error("message too large: {0}")]
195 MessageTooLarge(usize),
196 #[error("compression failed")]
197 CompressionFailed,
198 #[error("decompression failed")]
199 DecompressionFailed,
200 #[error("network closed")]
201 NetworkClosed,
202}
203
204pub use actors::tracker::Oracle;
205pub use channels::{Receiver, Sender};
206pub use config::{Bootstrapper, Config};
207pub use network::Network;
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use crate::{Receiver, Recipients, Sender};
213 use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
214 use commonware_macros::test_traced;
215 use commonware_runtime::{
216 deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
217 };
218 use commonware_utils::NZU32;
219 use governor::{clock::ReasonablyRealtime, Quota};
220 use rand::{CryptoRng, Rng};
221 use std::collections::HashSet;
222 use std::{
223 net::{IpAddr, Ipv4Addr, SocketAddr},
224 time::Duration,
225 };
226
227 #[derive(Copy, Clone)]
228 enum Mode {
229 All,
230 Some,
231 One,
232 }
233
234 const DEFAULT_MESSAGE_BACKLOG: usize = 128;
235
236 /// Test connectivity between `n` peers.
237 ///
238 /// We set a unique `base_port` for each test to avoid "address already in use"
239 /// errors when tests are run immediately after each other.
240 async fn run_network(
241 context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
242 max_message_size: usize,
243 base_port: u16,
244 n: usize,
245 mode: Mode,
246 ) {
247 // Create peers
248 let mut peers = Vec::new();
249 for i in 0..n {
250 peers.push(ed25519::PrivateKey::from_seed(i as u64));
251 }
252 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
253
254 // Create networks
255 let mut waiters = Vec::new();
256 for (i, peer) in peers.iter().enumerate() {
257 // Create peer context
258 let context = context.with_label(&format!("peer-{}", i));
259
260 // Derive port
261 let port = base_port + i as u16;
262
263 // Create bootstrappers
264 let mut bootstrappers = Vec::new();
265 if i > 0 {
266 bootstrappers.push((
267 addresses[0].clone(),
268 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
269 ));
270 }
271
272 // Create network
273 let signer = peer.clone();
274 let config = Config::test(
275 signer.clone(),
276 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
277 bootstrappers,
278 max_message_size,
279 );
280 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
281
282 // Register peers
283 oracle.register(0, addresses.clone()).await;
284
285 // Register basic application
286 let (mut sender, mut receiver) = network.register(
287 0,
288 Quota::per_second(NZU32!(5)), // Ensure we hit the rate limit
289 DEFAULT_MESSAGE_BACKLOG,
290 None,
291 );
292
293 // Wait to connect to all peers, and then send messages to everyone
294 network.start();
295
296 // Send/Receive messages
297 let handler = context.with_label("agent").spawn({
298 let addresses = addresses.clone();
299 move |context| async move {
300 // Wait for all peers to send their identity
301 let acker = context.with_label("receiver").spawn(move |_| async move {
302 let mut received = HashSet::new();
303 while received.len() < n - 1 {
304 // Ensure message equals sender identity
305 let (sender, message) = receiver.recv().await.unwrap();
306 assert_eq!(sender.as_ref(), message.as_ref());
307
308 // Add to received set
309 received.insert(sender);
310 }
311 });
312
313 // Send identity to all peers
314 let msg = signer.public_key();
315 match mode {
316 Mode::One => {
317 for (j, recipient) in addresses.iter().enumerate() {
318 // Don't send message to self
319 if i == j {
320 continue;
321 }
322
323 // Loop until success
324 loop {
325 let sent = sender
326 .send(
327 Recipients::One(recipient.clone()),
328 msg.to_vec().into(),
329 true,
330 )
331 .await
332 .unwrap();
333 if sent.len() != 1 {
334 context.sleep(Duration::from_millis(100)).await;
335 continue;
336 }
337 assert_eq!(&sent[0], recipient);
338 break;
339 }
340 }
341 }
342 Mode::Some => {
343 // Get all peers not including self
344 let mut recipients = addresses.clone();
345 recipients.remove(i);
346 recipients.sort();
347
348 // Loop until all peer sends successful
349 loop {
350 let mut sent = sender
351 .send(
352 Recipients::Some(recipients.clone()),
353 msg.to_vec().into(),
354 true,
355 )
356 .await
357 .unwrap();
358 if sent.len() != n - 1 {
359 context.sleep(Duration::from_millis(100)).await;
360 continue;
361 }
362
363 // Compare to expected
364 sent.sort();
365 assert_eq!(sent, recipients);
366 break;
367 }
368 }
369 Mode::All => {
370 // Get all peers not including self
371 let mut recipients = addresses.clone();
372 recipients.remove(i);
373 recipients.sort();
374
375 // Loop until all peer sends successful
376 loop {
377 let mut sent = sender
378 .send(Recipients::All, msg.to_vec().into(), true)
379 .await
380 .unwrap();
381 if sent.len() != n - 1 {
382 context.sleep(Duration::from_millis(100)).await;
383 continue;
384 }
385
386 // Compare to expected
387 sent.sort();
388 assert_eq!(sent, recipients);
389 break;
390 }
391 }
392 };
393
394 // Wait for all peers to send their identity
395 acker.await.unwrap();
396 }
397 });
398
399 // Add to waiters
400 waiters.push(handler);
401 }
402
403 // Wait for all peers to finish
404 for waiter in waiters {
405 waiter.await.unwrap();
406 }
407 }
408
409 fn run_deterministic_test(seed: u64, mode: Mode) {
410 // Configure test
411 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
412 const NUM_PEERS: usize = 25;
413 const BASE_PORT: u16 = 3000;
414
415 // Run first instance
416 let executor = deterministic::Runner::seeded(seed);
417 let state = executor.start(|context| async move {
418 run_network(
419 context.clone(),
420 MAX_MESSAGE_SIZE,
421 BASE_PORT,
422 NUM_PEERS,
423 mode,
424 )
425 .await;
426 context.auditor().state()
427 });
428
429 // Compare result to second instance
430 let executor = deterministic::Runner::seeded(seed);
431 let state2 = executor.start(|context| async move {
432 run_network(
433 context.clone(),
434 MAX_MESSAGE_SIZE,
435 BASE_PORT,
436 NUM_PEERS,
437 mode,
438 )
439 .await;
440 context.auditor().state()
441 });
442 assert_eq!(state, state2);
443 }
444
445 #[test_traced]
446 #[ignore]
447 fn test_determinism_one() {
448 for i in 0..10 {
449 run_deterministic_test(i, Mode::One);
450 }
451 }
452
453 #[test_traced]
454 #[ignore]
455 fn test_determinism_some() {
456 for i in 0..10 {
457 run_deterministic_test(i, Mode::Some);
458 }
459 }
460
461 #[test_traced]
462 #[ignore]
463 fn test_determinism_all() {
464 for i in 0..10 {
465 run_deterministic_test(i, Mode::All);
466 }
467 }
468
469 #[test_traced]
470 fn test_tokio_connectivity() {
471 let cfg = tokio::Config::default();
472 let executor = tokio::Runner::new(cfg.clone());
473 executor.start(|context| async move {
474 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
475 let base_port = 3000;
476 let n = 10;
477 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
478 });
479 }
480
481 #[test_traced]
482 fn test_multi_index_oracle() {
483 // Configure test
484 let base_port = 3000;
485 let n: usize = 100;
486
487 // Initialize context
488 let executor = deterministic::Runner::default();
489 executor.start(|context| async move {
490 // Create peers
491 let mut peers = Vec::new();
492 for i in 0..n {
493 peers.push(ed25519::PrivateKey::from_seed(i as u64));
494 }
495 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
496
497 // Create networks
498 let mut waiters = Vec::new();
499 for (i, peer) in peers.iter().enumerate() {
500 // Create peer context
501 let context = context.with_label(&format!("peer-{}", i));
502
503 // Derive port
504 let port = base_port + i as u16;
505
506 // Create bootstrappers
507 let mut bootstrappers = Vec::new();
508 if i > 0 {
509 bootstrappers.push((
510 addresses[0].clone(),
511 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
512 ));
513 }
514
515 // Create network
516 let signer = peer.clone();
517 let config = Config::test(
518 signer.clone(),
519 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
520 bootstrappers,
521 1_024 * 1_024, // 1MB
522 );
523 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
524
525 // Register peers at separate indices
526 oracle.register(0, vec![addresses[0].clone()]).await;
527 oracle
528 .register(1, vec![addresses[1].clone(), addresses[2].clone()])
529 .await;
530 oracle
531 .register(2, addresses.iter().skip(2).cloned().collect())
532 .await;
533
534 // Register basic application
535 let (mut sender, mut receiver) = network.register(
536 0,
537 Quota::per_second(NZU32!(10)),
538 DEFAULT_MESSAGE_BACKLOG,
539 None,
540 );
541
542 // Wait to connect to all peers, and then send messages to everyone
543 network.start();
544
545 // Send/Receive messages
546 let handler = context
547 .with_label("agent")
548 .spawn(move |context| async move {
549 if i == 0 {
550 // Loop until success
551 let msg = signer.public_key();
552 loop {
553 if sender
554 .send(Recipients::All, msg.to_vec().into(), true)
555 .await
556 .unwrap()
557 .len()
558 == n - 1
559 {
560 break;
561 }
562
563 // Sleep and try again (avoid busy loop)
564 context.sleep(Duration::from_millis(100)).await;
565 }
566 } else {
567 // Ensure message equals sender identity
568 let (sender, message) = receiver.recv().await.unwrap();
569 assert_eq!(sender.as_ref(), message.as_ref());
570 }
571 });
572
573 // Add to waiters
574 waiters.push(handler);
575 }
576
577 // Wait for waiters to finish (receiver before sender)
578 for waiter in waiters.into_iter().rev() {
579 waiter.await.unwrap();
580 }
581 });
582 }
583
584 fn test_message_too_large(compression: Option<i32>) {
585 // Configure test
586 let base_port = 3000;
587 let n: usize = 2;
588
589 // Initialize context
590 let executor = deterministic::Runner::seeded(0);
591 executor.start(|mut context| async move {
592 // Create peers
593 let mut peers = Vec::new();
594 for i in 0..n {
595 peers.push(ed25519::PrivateKey::from_seed(i as u64));
596 }
597 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
598
599 // Create network
600 let signer = peers[0].clone();
601 let config = Config::test(
602 signer.clone(),
603 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
604 Vec::new(),
605 1_024 * 1_024, // 1MB
606 );
607 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
608
609 // Register peers
610 oracle.register(0, addresses.clone()).await;
611
612 // Register basic application
613 let (mut sender, _) = network.register(
614 0,
615 Quota::per_second(NZU32!(10)),
616 DEFAULT_MESSAGE_BACKLOG,
617 compression,
618 );
619
620 // Wait to connect to all peers, and then send messages to everyone
621 network.start();
622
623 // Crate random message
624 let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
625 context.fill(&mut msg[..]);
626
627 // Send message
628 let recipient = Recipients::One(addresses[1].clone());
629 let result = sender.send(recipient, msg.into(), true).await;
630 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
631 });
632 }
633
634 #[test_traced]
635 fn test_message_too_large_no_compression() {
636 test_message_too_large(None);
637 }
638
639 #[test_traced]
640 fn test_message_too_large_compression() {
641 test_message_too_large(Some(3));
642 }
643}