commonware_p2p/authenticated/lookup/mod.rs
1//! Communicate with a fixed set of authenticated peers with known addresses over encrypted connections.
2//!
3//! `lookup` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Unlike `discovery`, peers in `lookup` don't use a discovery mechanism to find each other;
6//! each peer's address is supplied by the application layer.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
12//!
13//! # Design
14//!
15//! ## Discovery
16//!
17//! This module operates under the assumption that all peers are aware of and synchronized on
18//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
19//! list of peer `PublicKey`/`SocketAddr` pairs (`(u64, Vec<(PublicKey, SocketAddr)>)`).
20//!
21//! On startup, the application supplies the initial set of peers. The `Oracle` actor allows
22//! the application to update peer --> address mappings so that peers can find each other.
23//!
24//! ## Messages
25//!
26//! Application-level data is exchanged using the `Data` message type. This structure contains:
27//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
28//! - `message`: The arbitrary application payload as `Bytes`.
29//!
30//! The size of the `message` bytes must not exceed the configured
31//! `max_message_size`. If it does, the sending operation will fail with
32//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
33//! communications to potentially bypass lower-priority messages waiting in send queues across all
34//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
35//! and rate limiting.
36//!
37//! ## Compression
38//!
39//! Stream compression is not provided at the transport layer to avoid inadvertently
40//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
41//! between compression and encryption by analyzing patterns in the resulting data.
42//! By compressing secrets alongside attacker-controlled content, these attacks can infer
43//! sensitive information through compression ratio analysis. Applications that choose
44//! to compress data should do so with full awareness of these risks and implement
45//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
46//! alongside sensitive information).
47//!
48//! # Example
49//!
50//! ```rust
51//! use commonware_p2p::{authenticated::lookup::{self, Network}, Sender, Recipients};
52//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
53//! use commonware_runtime::{tokio, Spawner, Runner, Metrics};
54//! use commonware_utils::NZU32;
55//! use governor::Quota;
56//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
57//!
58//! // Configure context
59//! let runtime_cfg = tokio::Config::default();
60//! let runner = tokio::Runner::new(runtime_cfg.clone());
61//!
62//! // Generate identity
63//! //
64//! // In production, the signer should be generated from a secure source of entropy.
65//! let my_sk = ed25519::PrivateKey::from_seed(0);
66//! let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000);
67//!
68//! // Generate peers
69//! //
70//! // In production, peer identities will be provided by some external source of truth
71//! // (like the staking set of a blockchain).
72//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
73//! let peer1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001);
74//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
75//! let peer2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002);
76//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
77//! let peer3_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3003);
78//!
79//! // Configure namespace
80//! //
81//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
82//! let application_namespace = b"my-app-namespace";
83//!
84//! // Configure network
85//! //
86//! // In production, use a more conservative configuration like `Config::recommended`.
87//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
88//! let p2p_cfg = lookup::Config::aggressive(
89//! my_sk.clone(),
90//! application_namespace,
91//! my_addr,
92//! my_addr,
93//! MAX_MESSAGE_SIZE,
94//! );
95//!
96//! // Start context
97//! runner.start(|context| async move {
98//! // Initialize network
99//! let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
100//!
101//! // Register authorized peers
102//! //
103//! // In production, this would be updated as new peer sets are created (like when
104//! // the composition of a validator set changes).
105//! oracle.register(0, vec![(my_sk.public_key(), my_addr), (peer1, peer1_addr), (peer2, peer2_addr), (peer3, peer3_addr)]).await;
106//!
107//! // Register some channel
108//! const MAX_MESSAGE_BACKLOG: usize = 128;
109//! let (mut sender, receiver) = network.register(
110//! 0,
111//! Quota::per_second(NZU32!(1)),
112//! MAX_MESSAGE_BACKLOG,
113//! );
114//!
115//! // Run network
116//! let network_handler = network.start();
117//!
118//! // Example: Use sender
119//! let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
120//!
121//! // Shutdown network
122//! network_handler.abort();
123//! });
124//! ```
125
126mod actors;
127mod channels;
128mod config;
129mod metrics;
130mod network;
131mod types;
132
133use thiserror::Error;
134
135/// Errors that can occur when interacting with the network.
136#[derive(Error, Debug)]
137pub enum Error {
138 #[error("message too large: {0}")]
139 MessageTooLarge(usize),
140 #[error("network closed")]
141 NetworkClosed,
142}
143
144pub use actors::tracker::Oracle;
145pub use channels::{Receiver, Sender};
146pub use config::Config;
147pub use network::Network;
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::{Receiver, Recipients, Sender};
153 use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
154 use commonware_macros::test_traced;
155 use commonware_runtime::{
156 deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
157 };
158 use commonware_utils::NZU32;
159 use futures::{channel::mpsc, SinkExt, StreamExt};
160 use governor::{clock::ReasonablyRealtime, Quota};
161 use rand::{CryptoRng, Rng};
162 use std::{
163 collections::HashSet,
164 net::{IpAddr, Ipv4Addr, SocketAddr},
165 time::Duration,
166 };
167
168 #[derive(Copy, Clone)]
169 enum Mode {
170 All,
171 Some,
172 One,
173 }
174
175 const DEFAULT_MESSAGE_BACKLOG: usize = 128;
176
177 /// Ensure no message rate limiting occurred.
178 ///
179 /// If a message is rate limited, it would be formatted as:
180 ///
181 /// ```text
182 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
183 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
184 /// ```
185 fn assert_no_rate_limiting(context: &impl Metrics) {
186 let metrics = context.encode();
187 assert!(
188 !metrics.contains("messages_rate_limited_total{"),
189 "no messages should be rate limited: {metrics}"
190 );
191 }
192
193 /// Test connectivity between `n` peers.
194 ///
195 /// We set a unique `base_port` for each test to avoid "address already in use"
196 /// errors when tests are run immediately after each other.
197 async fn run_network(
198 context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
199 max_message_size: usize,
200 base_port: u16,
201 n: usize,
202 mode: Mode,
203 ) {
204 // Create peers
205 let mut peers_and_sks = Vec::new();
206 for i in 0..n {
207 let private_key = ed25519::PrivateKey::from_seed(i as u64);
208 let public_key = private_key.public_key();
209 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
210 peers_and_sks.push((private_key, public_key, address));
211 }
212 let peers = peers_and_sks
213 .iter()
214 .map(|(_, pub_key, addr)| (pub_key.clone(), *addr))
215 .collect::<Vec<_>>();
216
217 // Create networks
218 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
219 for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
220 let public_key = public_key.clone();
221
222 // Create peer context
223 let context = context.with_label(&format!("peer-{i}"));
224
225 // Create network
226 let config = Config::test(private_key.clone(), *address, max_message_size);
227 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
228
229 // Register peers
230 oracle.register(0, peers.clone()).await;
231
232 // Register basic application
233 let (mut sender, mut receiver) =
234 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
235
236 // Wait to connect to all peers, and then send messages to everyone
237 network.start();
238
239 // Send/Receive messages
240 let peers = peers.clone();
241 // All public keys (except self) sorted
242 let mut public_keys = peers
243 .iter()
244 .filter_map(|(pk, _)| {
245 if pk != &public_key {
246 Some(pk.clone())
247 } else {
248 None
249 }
250 })
251 .collect::<Vec<_>>();
252 public_keys.sort();
253 context.with_label("agent").spawn({
254 let mut complete_sender = complete_sender.clone();
255 move |context| async move {
256 // Wait for all peers to send their identity
257 context.with_label("receiver").spawn(move |_| async move {
258 // Wait for all peers to send their identity
259 let mut received = HashSet::new();
260 while received.len() < n - 1 {
261 // Ensure message equals sender identity
262 let (sender, message) = receiver.recv().await.unwrap();
263 assert_eq!(sender.as_ref(), message.as_ref());
264
265 // Add to received set
266 received.insert(sender);
267 }
268 complete_sender.send(()).await.unwrap();
269
270 // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
271 loop {
272 receiver.recv().await.unwrap();
273 }
274 });
275
276 // Send identity to all peers
277 context
278 .with_label("sender")
279 .spawn(move |context| async move {
280 // Loop forever to account for unexpected message drops
281 loop {
282 match mode {
283 Mode::One => {
284 for (j, (pub_key, _)) in peers.iter().enumerate() {
285 // Don't send message to self
286 if i == j {
287 continue;
288 }
289
290 // Loop until success
291 loop {
292 let sent = sender
293 .send(
294 Recipients::One(pub_key.clone()),
295 public_key.to_vec().into(),
296 true,
297 )
298 .await
299 .unwrap();
300 if sent.len() != 1 {
301 context.sleep(Duration::from_millis(100)).await;
302 continue;
303 }
304 assert_eq!(&sent[0], pub_key);
305 break;
306 }
307 }
308 }
309 Mode::Some => {
310 // Get all peers not including self
311 let mut recipients = peers.clone();
312 recipients.remove(i);
313 recipients.sort();
314
315 // Loop until all peer sends successful
316 loop {
317 let mut sent = sender
318 .send(
319 Recipients::Some(public_keys.clone()),
320 public_key.to_vec().into(),
321 true,
322 )
323 .await
324 .unwrap();
325 if sent.len() != n - 1 {
326 context.sleep(Duration::from_millis(100)).await;
327 continue;
328 }
329
330 // Compare to expected
331 sent.sort();
332 assert_eq!(sent, public_keys);
333 break;
334 }
335 }
336 Mode::All => {
337 // Get all peers not including self
338 let mut recipients = peers.clone();
339 recipients.remove(i);
340 recipients.sort();
341
342 // Loop until all peer sends successful
343 loop {
344 let mut sent = sender
345 .send(
346 Recipients::All,
347 public_key.to_vec().into(),
348 true,
349 )
350 .await
351 .unwrap();
352 if sent.len() != n - 1 {
353 context.sleep(Duration::from_millis(100)).await;
354 continue;
355 }
356
357 // Compare to expected
358 sent.sort();
359 assert_eq!(sent, public_keys);
360 break;
361 }
362 }
363 };
364
365 // Sleep to avoid busy loop
366 context.sleep(Duration::from_secs(10)).await;
367 }
368 });
369 }
370 });
371 }
372
373 // Wait for all peers to finish
374 for _ in 0..n {
375 complete_receiver.next().await.unwrap();
376 }
377
378 // Ensure no message rate limiting occurred
379 assert_no_rate_limiting(&context);
380 }
381
382 fn run_deterministic_test(seed: u64, mode: Mode) {
383 // Configure test
384 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
385 const NUM_PEERS: usize = 25;
386 const BASE_PORT: u16 = 3000;
387
388 // Run first instance
389 let executor = deterministic::Runner::seeded(seed);
390 let state = executor.start(|context| async move {
391 run_network(
392 context.clone(),
393 MAX_MESSAGE_SIZE,
394 BASE_PORT,
395 NUM_PEERS,
396 mode,
397 )
398 .await;
399 context.auditor().state()
400 });
401
402 // Compare result to second instance
403 let executor = deterministic::Runner::seeded(seed);
404 let state2 = executor.start(|context| async move {
405 run_network(
406 context.clone(),
407 MAX_MESSAGE_SIZE,
408 BASE_PORT,
409 NUM_PEERS,
410 mode,
411 )
412 .await;
413 context.auditor().state()
414 });
415 assert_eq!(state, state2);
416 }
417
418 #[test_traced]
419 #[ignore]
420 fn test_determinism_one() {
421 for i in 0..10 {
422 run_deterministic_test(i, Mode::One);
423 }
424 }
425
426 #[test_traced]
427 #[ignore]
428 fn test_determinism_some() {
429 for i in 0..10 {
430 run_deterministic_test(i, Mode::Some);
431 }
432 }
433
434 #[test_traced]
435 #[ignore]
436 fn test_determinism_all() {
437 for i in 0..10 {
438 run_deterministic_test(i, Mode::All);
439 }
440 }
441
442 #[test_traced]
443 fn test_tokio_connectivity() {
444 let cfg = tokio::Config::default();
445 let executor = tokio::Runner::new(cfg.clone());
446 executor.start(|context| async move {
447 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
448 let base_port = 4000;
449 let n = 10;
450 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
451 });
452 }
453
454 #[test_traced]
455 fn test_multi_index_oracle() {
456 // Configure test
457 let base_port = 3000;
458 let n: usize = 10;
459
460 // Initialize context
461 let executor = deterministic::Runner::default();
462 executor.start(|context| async move {
463 // Create peers
464 let mut peers_and_sks = Vec::new();
465 for i in 0..n {
466 let sk = ed25519::PrivateKey::from_seed(i as u64);
467 let pk = sk.public_key();
468 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
469 peers_and_sks.push((sk, pk, addr));
470 }
471 let peers = peers_and_sks
472 .iter()
473 .map(|(_, pk, addr)| (pk.clone(), *addr))
474 .collect::<Vec<_>>();
475
476 // Create networks
477 let mut waiters = Vec::new();
478 for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
479 // Create peer context
480 let context = context.with_label(&format!("peer-{i}"));
481
482 // Create network
483 let config = Config::test(
484 peer_sk.clone(),
485 *peer_addr,
486 1_024 * 1_024, // 1MB
487 );
488 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
489
490 // Register peers at separate indices
491 oracle.register(0, vec![peers[0].clone()]).await;
492 oracle
493 .register(1, vec![peers[1].clone(), peers[2].clone()])
494 .await;
495 oracle
496 .register(2, peers.iter().skip(2).cloned().collect())
497 .await;
498
499 // Register basic application
500 let (mut sender, mut receiver) =
501 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
502
503 // Wait to connect to all peers, and then send messages to everyone
504 network.start();
505
506 // Send/Receive messages
507 let msg = peer_pk.clone();
508 let handler = context
509 .with_label("agent")
510 .spawn(move |context| async move {
511 if i == 0 {
512 // Loop until success
513 loop {
514 if sender
515 .send(Recipients::All, msg.to_vec().into(), true)
516 .await
517 .unwrap()
518 .len()
519 == n - 1
520 {
521 break;
522 }
523
524 // Sleep and try again (avoid busy loop)
525 context.sleep(Duration::from_millis(100)).await;
526 }
527 } else {
528 // Ensure message equals sender identity
529 let (sender, message) = receiver.recv().await.unwrap();
530 assert_eq!(sender.as_ref(), message.as_ref());
531 }
532 });
533
534 // Add to waiters
535 waiters.push(handler);
536 }
537
538 // Wait for waiters to finish (receiver before sender)
539 for waiter in waiters.into_iter().rev() {
540 waiter.await.unwrap();
541 }
542
543 // Ensure no message rate limiting occurred
544 assert_no_rate_limiting(&context);
545 });
546 }
547
548 #[test_traced]
549 fn test_message_too_large() {
550 // Configure test
551 let base_port = 3000;
552 let n: usize = 2;
553
554 // Initialize context
555 let executor = deterministic::Runner::seeded(0);
556 executor.start(|mut context| async move {
557 // Create peers
558 let mut peers_and_sks = Vec::new();
559 for i in 0..n {
560 let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
561 let peer_pk = peer_sk.public_key();
562 let peer_addr =
563 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
564 peers_and_sks.push((peer_sk, peer_pk, peer_addr));
565 }
566 let peers = peers_and_sks
567 .iter()
568 .map(|(_, pk, addr)| (pk.clone(), *addr))
569 .collect::<Vec<_>>();
570
571 // Create network
572 let (sk, _, addr) = peers_and_sks[0].clone();
573 let config = Config::test(
574 sk,
575 addr,
576 1_024 * 1_024, // 1MB
577 );
578 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
579
580 // Register peers
581 oracle.register(0, peers.clone()).await;
582
583 // Register basic application
584 let (mut sender, _) =
585 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
586
587 // Wait to connect to all peers, and then send messages to everyone
588 network.start();
589
590 // Crate random message
591 let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
592 context.fill(&mut msg[..]);
593
594 // Send message
595 let recipient = Recipients::One(peers[1].0.clone());
596 let result = sender.send(recipient, msg.into(), true).await;
597 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
598 });
599 }
600
601 #[test_traced]
602 #[should_panic(expected = "no messages should be rate limited")]
603 fn test_rate_limiting() {
604 // Configure test
605 let base_port = 3000;
606 let n: usize = 2;
607
608 // Initialize context
609 let executor = deterministic::Runner::seeded(0);
610 executor.start(|context| async move {
611 // Create peers
612 let mut peers_and_sks = Vec::new();
613 for i in 0..n {
614 let sk = ed25519::PrivateKey::from_seed(i as u64);
615 let pk = sk.public_key();
616 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
617 peers_and_sks.push((sk, pk, addr));
618 }
619 let peers = peers_and_sks
620 .iter()
621 .map(|(_, pk, addr)| (pk.clone(), *addr))
622 .collect::<Vec<_>>();
623 let (sk0, _, addr0) = peers_and_sks[0].clone();
624 let (sk1, pk1, addr1) = peers_and_sks[1].clone();
625
626 // Create network for peer 0
627 let config0 = Config::test(sk0, addr0, 1_024 * 1_024); // 1MB
628 let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
629 oracle0.register(0, peers.clone()).await;
630 let (mut sender0, _receiver0) =
631 network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
632 network0.start();
633
634 // Create network for peer 1
635 let config1 = Config::test(sk1, addr1, 1_024 * 1_024); // 1MB
636 let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
637 oracle1.register(0, peers.clone()).await;
638 let (_sender1, _receiver1) =
639 network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
640 network1.start();
641
642 // Send first message, which should be allowed and consume the quota.
643 let msg = vec![0u8; 1024]; // 1KB
644 loop {
645 // Confirm message is sent to peer
646 let sent = sender0
647 .send(Recipients::One(pk1.clone()), msg.clone().into(), true)
648 .await
649 .unwrap();
650 if !sent.is_empty() {
651 break;
652 }
653
654 // Sleep and try again (avoid busy loop)
655 context.sleep(Duration::from_millis(100)).await;
656 }
657
658 // Immediately send the second message to trigger the rate limit.
659 let sent = sender0
660 .send(Recipients::One(pk1), msg.into(), true)
661 .await
662 .unwrap();
663 assert!(!sent.is_empty());
664
665 // Loop until the metrics reflect the rate-limited message.
666 for _ in 0..10 {
667 assert_no_rate_limiting(&context);
668 context.sleep(Duration::from_millis(100)).await;
669 }
670 });
671 }
672}