commonware_p2p/authenticated/lookup/
mod.rs

1//! Communicate with a fixed set of authenticated peers with known addresses over encrypted connections.
2//!
3//! `lookup` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Unlike `discovery`, peers in `lookup` don't use a discovery mechanism to find each other;
6//! each peer's address is supplied by the application layer.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
12//!
13//! # Design
14//!
15//! ## Discovery
16//!
17//! This module operates under the assumption that all peers are aware of and synchronized on
18//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
19//! list of peer `PublicKey`/`SocketAddr` pairs (`(u64, Vec<(PublicKey, SocketAddr)>)`).
20//!
21//! On startup, the application supplies the initial set of peers. The `Oracle` actor allows
22//! the application to update peer --> address mappings so that peers can find each other.
23//!
24//! ## Messages
25//!
26//! Application-level data is exchanged using the `Data` message type. This structure contains:
27//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
28//! - `message`: The arbitrary application payload as `Bytes`.
29//!
30//! The size of the `message` bytes must not exceed the configured
31//! `max_message_size`. If it does, the sending operation will fail with
32//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
33//! communications to potentially bypass lower-priority messages waiting in send queues across all
34//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
35//! and rate limiting.
36//!
37//! ## Compression
38//!
39//! Stream compression is not provided at the transport layer to avoid inadvertently
40//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
41//! between compression and encryption by analyzing patterns in the resulting data.
42//! By compressing secrets alongside attacker-controlled content, these attacks can infer
43//! sensitive information through compression ratio analysis. Applications that choose
44//! to compress data should do so with full awareness of these risks and implement
45//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
46//! alongside sensitive information).
47//!
48//! # Example
49//!
50//! ```rust
51//! use commonware_p2p::{authenticated::lookup::{self, Network}, Sender, Recipients};
52//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
53//! use commonware_runtime::{tokio, Spawner, Runner, Metrics};
54//! use commonware_utils::NZU32;
55//! use governor::Quota;
56//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
57//!
58//! // Configure context
59//! let runtime_cfg = tokio::Config::default();
60//! let runner = tokio::Runner::new(runtime_cfg.clone());
61//!
62//! // Generate identity
63//! //
64//! // In production, the signer should be generated from a secure source of entropy.
65//! let my_sk = ed25519::PrivateKey::from_seed(0);
66//! let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000);
67//!
68//! // Generate peers
69//! //
70//! // In production, peer identities will be provided by some external source of truth
71//! // (like the staking set of a blockchain).
72//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
73//! let peer1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001);
74//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
75//! let peer2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002);
76//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
77//! let peer3_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3003);
78//!
79//! // Configure namespace
80//! //
81//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
82//! let application_namespace = b"my-app-namespace";
83//!
84//! // Configure network
85//! //
86//! // In production, use a more conservative configuration like `Config::recommended`.
87//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
88//! let p2p_cfg = lookup::Config::aggressive(
89//!     my_sk.clone(),
90//!     application_namespace,
91//!     my_addr,
92//!     my_addr,
93//!     MAX_MESSAGE_SIZE,
94//! );
95//!
96//! // Start context
97//! runner.start(|context| async move {
98//!     // Initialize network
99//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
100//!
101//!     // Register authorized peers
102//!     //
103//!     // In production, this would be updated as new peer sets are created (like when
104//!     // the composition of a validator set changes).
105//!     oracle.register(0, vec![(my_sk.public_key(), my_addr), (peer1, peer1_addr), (peer2, peer2_addr), (peer3, peer3_addr)]).await;
106//!
107//!     // Register some channel
108//!     const MAX_MESSAGE_BACKLOG: usize = 128;
109//!     let (mut sender, receiver) = network.register(
110//!         0,
111//!         Quota::per_second(NZU32!(1)),
112//!         MAX_MESSAGE_BACKLOG,
113//!     );
114//!
115//!     // Run network
116//!     let network_handler = network.start();
117//!
118//!     // Example: Use sender
119//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
120//!
121//!     // Shutdown network
122//!     network_handler.abort();
123//! });
124//! ```
125
126mod actors;
127mod channels;
128mod config;
129mod metrics;
130mod network;
131mod types;
132
133use thiserror::Error;
134
135/// Errors that can occur when interacting with the network.
136#[derive(Error, Debug)]
137pub enum Error {
138    #[error("message too large: {0}")]
139    MessageTooLarge(usize),
140    #[error("network closed")]
141    NetworkClosed,
142}
143
144pub use actors::tracker::Oracle;
145pub use channels::{Receiver, Sender};
146pub use config::Config;
147pub use network::Network;
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::{Receiver, Recipients, Sender};
153    use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
154    use commonware_macros::test_traced;
155    use commonware_runtime::{
156        deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
157    };
158    use commonware_utils::NZU32;
159    use futures::{channel::mpsc, SinkExt, StreamExt};
160    use governor::{clock::ReasonablyRealtime, Quota};
161    use rand::{CryptoRng, Rng};
162    use std::{
163        collections::HashSet,
164        net::{IpAddr, Ipv4Addr, SocketAddr},
165        time::Duration,
166    };
167
168    #[derive(Copy, Clone)]
169    enum Mode {
170        All,
171        Some,
172        One,
173    }
174
175    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
176
177    /// Ensure no message rate limiting occurred.
178    ///
179    /// If a message is rate limited, it would be formatted as:
180    ///
181    /// ```text
182    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
183    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
184    /// ```
185    fn assert_no_rate_limiting(context: &impl Metrics) {
186        let metrics = context.encode();
187        assert!(
188            !metrics.contains("messages_rate_limited_total{"),
189            "no messages should be rate limited: {metrics}"
190        );
191    }
192
193    /// Test connectivity between `n` peers.
194    ///
195    /// We set a unique `base_port` for each test to avoid "address already in use"
196    /// errors when tests are run immediately after each other.
197    async fn run_network(
198        context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
199        max_message_size: usize,
200        base_port: u16,
201        n: usize,
202        mode: Mode,
203    ) {
204        // Create peers
205        let mut peers_and_sks = Vec::new();
206        for i in 0..n {
207            let private_key = ed25519::PrivateKey::from_seed(i as u64);
208            let public_key = private_key.public_key();
209            let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
210            peers_and_sks.push((private_key, public_key, address));
211        }
212        let peers = peers_and_sks
213            .iter()
214            .map(|(_, pub_key, addr)| (pub_key.clone(), *addr))
215            .collect::<Vec<_>>();
216
217        // Create networks
218        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
219        for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
220            let public_key = public_key.clone();
221
222            // Create peer context
223            let context = context.with_label(&format!("peer-{i}"));
224
225            // Create network
226            let config = Config::test(private_key.clone(), *address, max_message_size);
227            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
228
229            // Register peers
230            oracle.register(0, peers.clone()).await;
231
232            // Register basic application
233            let (mut sender, mut receiver) =
234                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
235
236            // Wait to connect to all peers, and then send messages to everyone
237            network.start();
238
239            // Send/Receive messages
240            let peers = peers.clone();
241            // All public keys (except self) sorted
242            let mut public_keys = peers
243                .iter()
244                .filter_map(|(pk, _)| {
245                    if pk != &public_key {
246                        Some(pk.clone())
247                    } else {
248                        None
249                    }
250                })
251                .collect::<Vec<_>>();
252            public_keys.sort();
253            context.with_label("agent").spawn({
254                let mut complete_sender = complete_sender.clone();
255                move |context| async move {
256                    // Wait for all peers to send their identity
257                    context.with_label("receiver").spawn(move |_| async move {
258                        // Wait for all peers to send their identity
259                        let mut received = HashSet::new();
260                        while received.len() < n - 1 {
261                            // Ensure message equals sender identity
262                            let (sender, message) = receiver.recv().await.unwrap();
263                            assert_eq!(sender.as_ref(), message.as_ref());
264
265                            // Add to received set
266                            received.insert(sender);
267                        }
268                        complete_sender.send(()).await.unwrap();
269
270                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
271                        loop {
272                            receiver.recv().await.unwrap();
273                        }
274                    });
275
276                    // Send identity to all peers
277                    context
278                        .with_label("sender")
279                        .spawn(move |context| async move {
280                            // Loop forever to account for unexpected message drops
281                            loop {
282                                match mode {
283                                    Mode::One => {
284                                        for (j, (pub_key, _)) in peers.iter().enumerate() {
285                                            // Don't send message to self
286                                            if i == j {
287                                                continue;
288                                            }
289
290                                            // Loop until success
291                                            loop {
292                                                let sent = sender
293                                                    .send(
294                                                        Recipients::One(pub_key.clone()),
295                                                        public_key.to_vec().into(),
296                                                        true,
297                                                    )
298                                                    .await
299                                                    .unwrap();
300                                                if sent.len() != 1 {
301                                                    context.sleep(Duration::from_millis(100)).await;
302                                                    continue;
303                                                }
304                                                assert_eq!(&sent[0], pub_key);
305                                                break;
306                                            }
307                                        }
308                                    }
309                                    Mode::Some => {
310                                        // Get all peers not including self
311                                        let mut recipients = peers.clone();
312                                        recipients.remove(i);
313                                        recipients.sort();
314
315                                        // Loop until all peer sends successful
316                                        loop {
317                                            let mut sent = sender
318                                                .send(
319                                                    Recipients::Some(public_keys.clone()),
320                                                    public_key.to_vec().into(),
321                                                    true,
322                                                )
323                                                .await
324                                                .unwrap();
325                                            if sent.len() != n - 1 {
326                                                context.sleep(Duration::from_millis(100)).await;
327                                                continue;
328                                            }
329
330                                            // Compare to expected
331                                            sent.sort();
332                                            assert_eq!(sent, public_keys);
333                                            break;
334                                        }
335                                    }
336                                    Mode::All => {
337                                        // Get all peers not including self
338                                        let mut recipients = peers.clone();
339                                        recipients.remove(i);
340                                        recipients.sort();
341
342                                        // Loop until all peer sends successful
343                                        loop {
344                                            let mut sent = sender
345                                                .send(
346                                                    Recipients::All,
347                                                    public_key.to_vec().into(),
348                                                    true,
349                                                )
350                                                .await
351                                                .unwrap();
352                                            if sent.len() != n - 1 {
353                                                context.sleep(Duration::from_millis(100)).await;
354                                                continue;
355                                            }
356
357                                            // Compare to expected
358                                            sent.sort();
359                                            assert_eq!(sent, public_keys);
360                                            break;
361                                        }
362                                    }
363                                };
364
365                                // Sleep to avoid busy loop
366                                context.sleep(Duration::from_secs(10)).await;
367                            }
368                        });
369                }
370            });
371        }
372
373        // Wait for all peers to finish
374        for _ in 0..n {
375            complete_receiver.next().await.unwrap();
376        }
377
378        // Ensure no message rate limiting occurred
379        assert_no_rate_limiting(&context);
380    }
381
382    fn run_deterministic_test(seed: u64, mode: Mode) {
383        // Configure test
384        const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
385        const NUM_PEERS: usize = 25;
386        const BASE_PORT: u16 = 3000;
387
388        // Run first instance
389        let executor = deterministic::Runner::seeded(seed);
390        let state = executor.start(|context| async move {
391            run_network(
392                context.clone(),
393                MAX_MESSAGE_SIZE,
394                BASE_PORT,
395                NUM_PEERS,
396                mode,
397            )
398            .await;
399            context.auditor().state()
400        });
401
402        // Compare result to second instance
403        let executor = deterministic::Runner::seeded(seed);
404        let state2 = executor.start(|context| async move {
405            run_network(
406                context.clone(),
407                MAX_MESSAGE_SIZE,
408                BASE_PORT,
409                NUM_PEERS,
410                mode,
411            )
412            .await;
413            context.auditor().state()
414        });
415        assert_eq!(state, state2);
416    }
417
418    #[test_traced]
419    #[ignore]
420    fn test_determinism_one() {
421        for i in 0..10 {
422            run_deterministic_test(i, Mode::One);
423        }
424    }
425
426    #[test_traced]
427    #[ignore]
428    fn test_determinism_some() {
429        for i in 0..10 {
430            run_deterministic_test(i, Mode::Some);
431        }
432    }
433
434    #[test_traced]
435    #[ignore]
436    fn test_determinism_all() {
437        for i in 0..10 {
438            run_deterministic_test(i, Mode::All);
439        }
440    }
441
442    #[test_traced]
443    fn test_tokio_connectivity() {
444        let cfg = tokio::Config::default();
445        let executor = tokio::Runner::new(cfg.clone());
446        executor.start(|context| async move {
447            const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
448            let base_port = 4000;
449            let n = 10;
450            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
451        });
452    }
453
454    #[test_traced]
455    fn test_multi_index_oracle() {
456        // Configure test
457        let base_port = 3000;
458        let n: usize = 10;
459
460        // Initialize context
461        let executor = deterministic::Runner::default();
462        executor.start(|context| async move {
463            // Create peers
464            let mut peers_and_sks = Vec::new();
465            for i in 0..n {
466                let sk = ed25519::PrivateKey::from_seed(i as u64);
467                let pk = sk.public_key();
468                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
469                peers_and_sks.push((sk, pk, addr));
470            }
471            let peers = peers_and_sks
472                .iter()
473                .map(|(_, pk, addr)| (pk.clone(), *addr))
474                .collect::<Vec<_>>();
475
476            // Create networks
477            let mut waiters = Vec::new();
478            for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
479                // Create peer context
480                let context = context.with_label(&format!("peer-{i}"));
481
482                // Create network
483                let config = Config::test(
484                    peer_sk.clone(),
485                    *peer_addr,
486                    1_024 * 1_024, // 1MB
487                );
488                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
489
490                // Register peers at separate indices
491                oracle.register(0, vec![peers[0].clone()]).await;
492                oracle
493                    .register(1, vec![peers[1].clone(), peers[2].clone()])
494                    .await;
495                oracle
496                    .register(2, peers.iter().skip(2).cloned().collect())
497                    .await;
498
499                // Register basic application
500                let (mut sender, mut receiver) =
501                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
502
503                // Wait to connect to all peers, and then send messages to everyone
504                network.start();
505
506                // Send/Receive messages
507                let msg = peer_pk.clone();
508                let handler = context
509                    .with_label("agent")
510                    .spawn(move |context| async move {
511                        if i == 0 {
512                            // Loop until success
513                            loop {
514                                if sender
515                                    .send(Recipients::All, msg.to_vec().into(), true)
516                                    .await
517                                    .unwrap()
518                                    .len()
519                                    == n - 1
520                                {
521                                    break;
522                                }
523
524                                // Sleep and try again (avoid busy loop)
525                                context.sleep(Duration::from_millis(100)).await;
526                            }
527                        } else {
528                            // Ensure message equals sender identity
529                            let (sender, message) = receiver.recv().await.unwrap();
530                            assert_eq!(sender.as_ref(), message.as_ref());
531                        }
532                    });
533
534                // Add to waiters
535                waiters.push(handler);
536            }
537
538            // Wait for waiters to finish (receiver before sender)
539            for waiter in waiters.into_iter().rev() {
540                waiter.await.unwrap();
541            }
542
543            // Ensure no message rate limiting occurred
544            assert_no_rate_limiting(&context);
545        });
546    }
547
548    #[test_traced]
549    fn test_message_too_large() {
550        // Configure test
551        let base_port = 3000;
552        let n: usize = 2;
553
554        // Initialize context
555        let executor = deterministic::Runner::seeded(0);
556        executor.start(|mut context| async move {
557            // Create peers
558            let mut peers_and_sks = Vec::new();
559            for i in 0..n {
560                let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
561                let peer_pk = peer_sk.public_key();
562                let peer_addr =
563                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
564                peers_and_sks.push((peer_sk, peer_pk, peer_addr));
565            }
566            let peers = peers_and_sks
567                .iter()
568                .map(|(_, pk, addr)| (pk.clone(), *addr))
569                .collect::<Vec<_>>();
570
571            // Create network
572            let (sk, _, addr) = peers_and_sks[0].clone();
573            let config = Config::test(
574                sk,
575                addr,
576                1_024 * 1_024, // 1MB
577            );
578            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
579
580            // Register peers
581            oracle.register(0, peers.clone()).await;
582
583            // Register basic application
584            let (mut sender, _) =
585                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
586
587            // Wait to connect to all peers, and then send messages to everyone
588            network.start();
589
590            // Crate random message
591            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
592            context.fill(&mut msg[..]);
593
594            // Send message
595            let recipient = Recipients::One(peers[1].0.clone());
596            let result = sender.send(recipient, msg.into(), true).await;
597            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
598        });
599    }
600
601    #[test_traced]
602    #[should_panic(expected = "no messages should be rate limited")]
603    fn test_rate_limiting() {
604        // Configure test
605        let base_port = 3000;
606        let n: usize = 2;
607
608        // Initialize context
609        let executor = deterministic::Runner::seeded(0);
610        executor.start(|context| async move {
611            // Create peers
612            let mut peers_and_sks = Vec::new();
613            for i in 0..n {
614                let sk = ed25519::PrivateKey::from_seed(i as u64);
615                let pk = sk.public_key();
616                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
617                peers_and_sks.push((sk, pk, addr));
618            }
619            let peers = peers_and_sks
620                .iter()
621                .map(|(_, pk, addr)| (pk.clone(), *addr))
622                .collect::<Vec<_>>();
623            let (sk0, _, addr0) = peers_and_sks[0].clone();
624            let (sk1, pk1, addr1) = peers_and_sks[1].clone();
625
626            // Create network for peer 0
627            let config0 = Config::test(sk0, addr0, 1_024 * 1_024); // 1MB
628            let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
629            oracle0.register(0, peers.clone()).await;
630            let (mut sender0, _receiver0) =
631                network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
632            network0.start();
633
634            // Create network for peer 1
635            let config1 = Config::test(sk1, addr1, 1_024 * 1_024); // 1MB
636            let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
637            oracle1.register(0, peers.clone()).await;
638            let (_sender1, _receiver1) =
639                network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
640            network1.start();
641
642            // Send first message, which should be allowed and consume the quota.
643            let msg = vec![0u8; 1024]; // 1KB
644            loop {
645                // Confirm message is sent to peer
646                let sent = sender0
647                    .send(Recipients::One(pk1.clone()), msg.clone().into(), true)
648                    .await
649                    .unwrap();
650                if !sent.is_empty() {
651                    break;
652                }
653
654                // Sleep and try again (avoid busy loop)
655                context.sleep(Duration::from_millis(100)).await;
656            }
657
658            // Immediately send the second message to trigger the rate limit.
659            let sent = sender0
660                .send(Recipients::One(pk1), msg.into(), true)
661                .await
662                .unwrap();
663            assert!(!sent.is_empty());
664
665            // Loop until the metrics reflect the rate-limited message.
666            for _ in 0..10 {
667                assert_no_rate_limiting(&context);
668                context.sleep(Duration::from_millis(100)).await;
669            }
670        });
671    }
672}