commonware_p2p/authenticated/lookup/mod.rs
1//! Communicate with a fixed set of authenticated peers with known addresses over encrypted connections.
2//!
3//! `lookup` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Unlike `discovery`, peers in `lookup` don't use a discovery mechanism to find each other;
6//! each peer's address is supplied by the application layer.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
12//!
13//! # Design
14//!
15//! ## Discovery
16//!
17//! This module operates under the assumption that all peers are aware of and synchronized on
18//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
19//! list of peer `PublicKey`/`SocketAddr` pairs (`(u64, Vec<(PublicKey, SocketAddr)>)`).
20//!
21//! On startup, the application supplies the initial set of peers. The `Oracle` actor allows
22//! the application to update peer --> address mappings so that peers can find each other.
23//!
24//! Any inbound connection attempts from an IP address that is not in the union of all registered
25//! peer sets will be rejected.
26//!
27//! ## Messages
28//!
29//! Application-level data is exchanged using the `Data` message type. This structure contains:
30//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
31//! - `message`: The arbitrary application payload as `Bytes`.
32//!
33//! The size of the `message` bytes must not exceed the configured
34//! `max_message_size`. If it does, the sending operation will fail with
35//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
36//! communications to potentially bypass lower-priority messages waiting in send queues across all
37//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
38//! and rate limiting.
39//!
40//! ## Compression
41//!
42//! Stream compression is not provided at the transport layer to avoid inadvertently
43//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
44//! between compression and encryption by analyzing patterns in the resulting data.
45//! By compressing secrets alongside attacker-controlled content, these attacks can infer
46//! sensitive information through compression ratio analysis. Applications that choose
47//! to compress data should do so with full awareness of these risks and implement
48//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
49//! alongside sensitive information).
50//!
51//! ## Rate Limiting
52//!
53//! There are five primary rate limits:
54//!
55//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
56//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
57//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
58//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
59//! - `rate` (per channel): The rate limit for messages sent on a single channel.
60//!
61//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
62//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
63//!
64//! # Example
65//!
66//! ```rust
67//! use commonware_p2p::{authenticated::lookup::{self, Network}, Manager, Sender, Recipients};
68//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
69//! use commonware_runtime::{deterministic, Spawner, Runner, Metrics};
70//! use commonware_utils::{NZU32, set::OrderedAssociated};
71//! use governor::Quota;
72//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
73//!
74//! // Configure context
75//! let runtime_cfg = deterministic::Config::default();
76//! let runner = deterministic::Runner::new(runtime_cfg.clone());
77//!
78//! // Generate identity
79//! //
80//! // In production, the signer should be generated from a secure source of entropy.
81//! let my_sk = ed25519::PrivateKey::from_seed(0);
82//! let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000);
83//!
84//! // Generate peers
85//! //
86//! // In production, peer identities will be provided by some external source of truth
87//! // (like the staking set of a blockchain).
88//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
89//! let peer1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001);
90//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
91//! let peer2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002);
92//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
93//! let peer3_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3003);
94//!
95//! // Configure namespace
96//! //
97//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
98//! let application_namespace = b"my-app-namespace";
99//!
100//! // Configure network
101//! //
102//! // In production, use a more conservative configuration like `Config::recommended`.
103//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
104//! let p2p_cfg = lookup::Config::local(
105//! my_sk.clone(),
106//! application_namespace,
107//! my_addr,
108//! my_addr,
109//! MAX_MESSAGE_SIZE,
110//! );
111//!
112//! // Start context
113//! runner.start(|context| async move {
114//! // Initialize network
115//! let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
116//!
117//! // Register authorized peers
118//! //
119//! // In production, this would be updated as new peer sets are created (like when
120//! // the composition of a validator set changes).
121//! oracle.update(
122//! 0,
123//! OrderedAssociated::from([(my_sk.public_key(), my_addr), (peer1, peer1_addr), (peer2, peer2_addr), (peer3, peer3_addr)])
124//! ).await;
125//!
126//! // Register some channel
127//! const MAX_MESSAGE_BACKLOG: usize = 128;
128//! let (mut sender, receiver) = network.register(
129//! 0,
130//! Quota::per_second(NZU32!(1)),
131//! MAX_MESSAGE_BACKLOG,
132//! );
133//!
134//! // Run network
135//! let network_handler = network.start();
136//!
137//! // Example: Use sender
138//! let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
139//!
140//! // Shutdown network
141//! network_handler.abort();
142//! });
143//! ```
144
145mod actors;
146mod channels;
147mod config;
148mod metrics;
149mod network;
150mod types;
151
152use thiserror::Error;
153
154/// Errors that can occur when interacting with the network.
155#[derive(Error, Debug)]
156pub enum Error {
157 #[error("message too large: {0}")]
158 MessageTooLarge(usize),
159 #[error("network closed")]
160 NetworkClosed,
161}
162
163pub use actors::tracker::Oracle;
164pub use channels::{Receiver, Sender};
165pub use config::Config;
166pub use network::Network;
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use crate::{Manager, Receiver, Recipients, Sender};
172 use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
173 use commonware_macros::{select, test_traced};
174 use commonware_runtime::{
175 deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
176 };
177 use commonware_utils::{set::OrderedAssociated, NZU32};
178 use futures::{channel::mpsc, SinkExt, StreamExt};
179 use governor::{clock::ReasonablyRealtime, Quota};
180 use rand::{CryptoRng, Rng};
181 use std::{
182 collections::HashSet,
183 net::{IpAddr, Ipv4Addr, SocketAddr},
184 time::Duration,
185 };
186
187 #[derive(Copy, Clone)]
188 enum Mode {
189 All,
190 Some,
191 One,
192 }
193
194 const DEFAULT_MESSAGE_BACKLOG: usize = 128;
195
196 /// Ensure no message rate limiting occurred.
197 ///
198 /// If a message is rate limited, it would be formatted as:
199 ///
200 /// ```text
201 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
202 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
203 /// ```
204 fn assert_no_rate_limiting(context: &impl Metrics) {
205 let metrics = context.encode();
206 assert!(
207 !metrics.contains("messages_rate_limited_total{"),
208 "no messages should be rate limited: {metrics}"
209 );
210 }
211
212 /// Test connectivity between `n` peers.
213 ///
214 /// We set a unique `base_port` for each test to avoid "address already in use"
215 /// errors when tests are run immediately after each other.
216 async fn run_network(
217 context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
218 max_message_size: usize,
219 base_port: u16,
220 n: usize,
221 mode: Mode,
222 ) {
223 // Create peers
224 let mut peers_and_sks = Vec::new();
225 for i in 0..n {
226 let private_key = ed25519::PrivateKey::from_seed(i as u64);
227 let public_key = private_key.public_key();
228 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
229 peers_and_sks.push((private_key, public_key, address));
230 }
231 let peers = peers_and_sks
232 .iter()
233 .map(|(_, pub_key, addr)| (pub_key.clone(), *addr))
234 .collect::<Vec<_>>();
235
236 // Create networks
237 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
238 for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
239 let public_key = public_key.clone();
240
241 // Create peer context
242 let context = context.with_label(&format!("peer-{i}"));
243
244 // Create network
245 let config = Config::test(private_key.clone(), *address, max_message_size);
246 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
247
248 // Register peers
249 oracle
250 .update(0, OrderedAssociated::from(peers.clone()))
251 .await;
252
253 // Register basic application
254 let (mut sender, mut receiver) =
255 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
256
257 // Wait to connect to all peers, and then send messages to everyone
258 network.start();
259
260 // Send/Receive messages
261 let mut public_keys = peers
262 .iter()
263 .filter_map(|(pk, _)| {
264 if pk != &public_key {
265 Some(pk.clone())
266 } else {
267 None
268 }
269 })
270 .collect::<Vec<_>>();
271 public_keys.sort();
272 context.with_label("agent").spawn({
273 let mut complete_sender = complete_sender.clone();
274 let peers = peers.clone();
275 move |context| async move {
276 // Wait for all peers to send their identity
277 let receiver = context.with_label("receiver").spawn(move |_| async move {
278 // Wait for all peers to send their identity
279 let mut received = HashSet::new();
280 while received.len() < n - 1 {
281 // Ensure message equals sender identity
282 let (sender, message) = receiver.recv().await.unwrap();
283 assert_eq!(sender.as_ref(), message.as_ref());
284
285 // Add to received set
286 received.insert(sender);
287 }
288 complete_sender.send(()).await.unwrap();
289
290 // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
291 loop {
292 receiver.recv().await.unwrap();
293 }
294 });
295
296 // Send identity to all peers
297 let sender = context
298 .with_label("sender")
299 .spawn(move |context| async move {
300 // Loop forever to account for unexpected message drops
301 loop {
302 match mode {
303 Mode::One => {
304 for (j, (pub_key, _)) in peers.iter().enumerate() {
305 // Don't send message to self
306 if i == j {
307 continue;
308 }
309
310 // Loop until success
311 loop {
312 let sent = sender
313 .send(
314 Recipients::One(pub_key.clone()),
315 public_key.to_vec().into(),
316 true,
317 )
318 .await
319 .unwrap();
320 if sent.len() != 1 {
321 context.sleep(Duration::from_millis(100)).await;
322 continue;
323 }
324 assert_eq!(&sent[0], pub_key);
325 break;
326 }
327 }
328 }
329 Mode::Some => {
330 // Get all peers not including self
331 let mut recipients = peers.clone();
332 recipients.remove(i);
333 recipients.sort();
334
335 // Loop until all peer sends successful
336 loop {
337 let mut sent = sender
338 .send(
339 Recipients::Some(public_keys.clone()),
340 public_key.to_vec().into(),
341 true,
342 )
343 .await
344 .unwrap();
345 if sent.len() != n - 1 {
346 context.sleep(Duration::from_millis(100)).await;
347 continue;
348 }
349
350 // Compare to expected
351 sent.sort();
352 assert_eq!(sent, public_keys);
353 break;
354 }
355 }
356 Mode::All => {
357 // Get all peers not including self
358 let mut recipients = peers.clone();
359 recipients.remove(i);
360 recipients.sort();
361
362 // Loop until all peer sends successful
363 loop {
364 let mut sent = sender
365 .send(
366 Recipients::All,
367 public_key.to_vec().into(),
368 true,
369 )
370 .await
371 .unwrap();
372 if sent.len() != n - 1 {
373 context.sleep(Duration::from_millis(100)).await;
374 continue;
375 }
376
377 // Compare to expected
378 sent.sort();
379 assert_eq!(sent, public_keys);
380 break;
381 }
382 }
383 };
384
385 // Sleep to avoid busy loop
386 context.sleep(Duration::from_secs(10)).await;
387 }
388 });
389
390 // Neither task should exit
391 select! {
392 receiver = receiver => {
393 panic!("receiver exited: {receiver:?}");
394 },
395 sender = sender => {
396 panic!("sender exited: {sender:?}");
397 },
398 }
399 }
400 });
401 }
402
403 // Wait for all peers to finish
404 for _ in 0..n {
405 complete_receiver.next().await.unwrap();
406 }
407
408 // Ensure no message rate limiting occurred
409 assert_no_rate_limiting(&context);
410 }
411
412 fn run_deterministic_test(seed: u64, mode: Mode) {
413 // Configure test
414 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
415 const NUM_PEERS: usize = 25;
416 const BASE_PORT: u16 = 3000;
417
418 // Run first instance
419 let executor = deterministic::Runner::seeded(seed);
420 let state = executor.start(|context| async move {
421 run_network(
422 context.clone(),
423 MAX_MESSAGE_SIZE,
424 BASE_PORT,
425 NUM_PEERS,
426 mode,
427 )
428 .await;
429 context.auditor().state()
430 });
431
432 // Compare result to second instance
433 let executor = deterministic::Runner::seeded(seed);
434 let state2 = executor.start(|context| async move {
435 run_network(
436 context.clone(),
437 MAX_MESSAGE_SIZE,
438 BASE_PORT,
439 NUM_PEERS,
440 mode,
441 )
442 .await;
443 context.auditor().state()
444 });
445 assert_eq!(state, state2);
446 }
447
448 #[test_traced]
449 #[ignore]
450 fn test_determinism_one() {
451 for i in 0..10 {
452 run_deterministic_test(i, Mode::One);
453 }
454 }
455
456 #[test_traced]
457 #[ignore]
458 fn test_determinism_some() {
459 for i in 0..10 {
460 run_deterministic_test(i, Mode::Some);
461 }
462 }
463
464 #[test_traced]
465 #[ignore]
466 fn test_determinism_all() {
467 for i in 0..10 {
468 run_deterministic_test(i, Mode::All);
469 }
470 }
471
472 #[test_traced]
473 fn test_tokio_connectivity() {
474 let executor = tokio::Runner::default();
475 executor.start(|context| async move {
476 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
477 let base_port = 4000;
478 let n = 10;
479 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
480 });
481 }
482
483 #[test_traced]
484 fn test_multi_index_oracle() {
485 // Configure test
486 let base_port = 3000;
487 let n: usize = 10;
488
489 // Initialize context
490 let executor = deterministic::Runner::default();
491 executor.start(|context| async move {
492 // Create peers
493 let mut peers_and_sks = Vec::new();
494 for i in 0..n {
495 let sk = ed25519::PrivateKey::from_seed(i as u64);
496 let pk = sk.public_key();
497 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
498 peers_and_sks.push((sk, pk, addr));
499 }
500 let peers = peers_and_sks
501 .iter()
502 .map(|(_, pk, addr)| (pk.clone(), *addr))
503 .collect::<Vec<_>>();
504
505 // Create networks
506 let mut waiters = Vec::new();
507 for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
508 // Create peer context
509 let context = context.with_label(&format!("peer-{i}"));
510
511 // Create network
512 let config = Config::test(
513 peer_sk.clone(),
514 *peer_addr,
515 1_024 * 1_024, // 1MB
516 );
517 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
518
519 // Register peers at separate indices
520 oracle
521 .update(0, OrderedAssociated::from([peers[0].clone()]))
522 .await;
523 oracle
524 .update(
525 1,
526 OrderedAssociated::from([peers[1].clone(), peers[2].clone()]),
527 )
528 .await;
529 oracle
530 .update(2, peers.iter().skip(2).cloned().collect())
531 .await;
532
533 // Register basic application
534 let (mut sender, mut receiver) =
535 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
536
537 // Wait to connect to all peers, and then send messages to everyone
538 network.start();
539
540 // Send/Receive messages
541 let msg = peer_pk.clone();
542 let handler = context
543 .with_label("agent")
544 .spawn(move |context| async move {
545 if i == 0 {
546 // Loop until success
547 loop {
548 if sender
549 .send(Recipients::All, msg.to_vec().into(), true)
550 .await
551 .unwrap()
552 .len()
553 == n - 1
554 {
555 break;
556 }
557
558 // Sleep and try again (avoid busy loop)
559 context.sleep(Duration::from_millis(100)).await;
560 }
561 } else {
562 // Ensure message equals sender identity
563 let (sender, message) = receiver.recv().await.unwrap();
564 assert_eq!(sender.as_ref(), message.as_ref());
565 }
566 });
567
568 // Add to waiters
569 waiters.push(handler);
570 }
571
572 // Wait for waiters to finish (receiver before sender)
573 for waiter in waiters.into_iter().rev() {
574 waiter.await.unwrap();
575 }
576
577 // Ensure no message rate limiting occurred
578 assert_no_rate_limiting(&context);
579 });
580 }
581
582 #[test_traced]
583 fn test_message_too_large() {
584 // Configure test
585 let base_port = 3000;
586 let n: usize = 2;
587
588 // Initialize context
589 let executor = deterministic::Runner::seeded(0);
590 executor.start(|mut context| async move {
591 // Create peers
592 let mut peers_and_sks = Vec::new();
593 for i in 0..n {
594 let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
595 let peer_pk = peer_sk.public_key();
596 let peer_addr =
597 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
598 peers_and_sks.push((peer_sk, peer_pk, peer_addr));
599 }
600 let peers: OrderedAssociated<_, _> = peers_and_sks
601 .iter()
602 .map(|(_, pk, addr)| (pk.clone(), *addr))
603 .collect();
604
605 // Create network
606 let (sk, _, addr) = peers_and_sks[0].clone();
607 let config = Config::test(
608 sk,
609 addr,
610 1_024 * 1_024, // 1MB
611 );
612 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
613
614 // Register peers
615 oracle.update(0, peers.clone()).await;
616
617 // Register basic application
618 let (mut sender, _) =
619 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
620
621 // Wait to connect to all peers, and then send messages to everyone
622 network.start();
623
624 // Crate random message
625 let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
626 context.fill(&mut msg[..]);
627
628 // Send message
629 let recipient = Recipients::One(peers[1].clone());
630 let result = sender.send(recipient, msg.into(), true).await;
631 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
632 });
633 }
634
635 #[test_traced]
636 #[should_panic(expected = "no messages should be rate limited")]
637 fn test_rate_limiting() {
638 // Configure test
639 let base_port = 3000;
640 let n: usize = 2;
641
642 // Initialize context
643 let executor = deterministic::Runner::seeded(0);
644 executor.start(|context| async move {
645 // Create peers
646 let mut peers_and_sks = Vec::new();
647 for i in 0..n {
648 let sk = ed25519::PrivateKey::from_seed(i as u64);
649 let pk = sk.public_key();
650 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
651 peers_and_sks.push((sk, pk, addr));
652 }
653 let peers: OrderedAssociated<_, _> = peers_and_sks
654 .iter()
655 .map(|(_, pk, addr)| (pk.clone(), *addr))
656 .collect();
657 let (sk0, _, addr0) = peers_and_sks[0].clone();
658 let (sk1, pk1, addr1) = peers_and_sks[1].clone();
659
660 // Create network for peer 0
661 let config0 = Config::test(sk0, addr0, 1_024 * 1_024); // 1MB
662 let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
663 oracle0.update(0, peers.clone()).await;
664 let (mut sender0, _receiver0) =
665 network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
666 network0.start();
667
668 // Create network for peer 1
669 let config1 = Config::test(sk1, addr1, 1_024 * 1_024); // 1MB
670 let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
671 oracle1.update(0, peers.clone()).await;
672 let (_sender1, _receiver1) =
673 network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
674 network1.start();
675
676 // Send first message, which should be allowed and consume the quota.
677 let msg = vec![0u8; 1024]; // 1KB
678 loop {
679 // Confirm message is sent to peer
680 let sent = sender0
681 .send(Recipients::One(pk1.clone()), msg.clone().into(), true)
682 .await
683 .unwrap();
684 if !sent.is_empty() {
685 break;
686 }
687
688 // Sleep and try again (avoid busy loop)
689 context.sleep(Duration::from_millis(100)).await;
690 }
691
692 // Immediately send the second message to trigger the rate limit.
693 let sent = sender0
694 .send(Recipients::One(pk1), msg.into(), true)
695 .await
696 .unwrap();
697 assert!(!sent.is_empty());
698
699 // Loop until the metrics reflect the rate-limited message.
700 for _ in 0..10 {
701 assert_no_rate_limiting(&context);
702 context.sleep(Duration::from_millis(100)).await;
703 }
704 });
705 }
706}