commonware_p2p/authenticated/discovery/
mod.rs

1//! Communicate with a fixed set of authenticated peers without known addresses over encrypted connections.
2//!
3//! `discovery` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//!
14//! # Design
15//!
16//! ## Discovery
17//!
18//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
19//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
20//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
21//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
22//! dialable addresses [std::net::SocketAddr] for the peers in that set.
23//! The `BitVec` message contains:
24//! - `index`: The `u64` index the bit vector applies to.
25//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
26//!   peer's address in the sorted list for that index.
27//!
28//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
29//! peer a (configurable) number of times, they will assume that information is stale and mark it as
30//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
31//! information about other peers.
32//!
33//! _Warning: If peers are not synchronized on the peer set composition at a given index,
34//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
35//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
36//! application layer is responsible for ensuring peer set synchronization._
37//!
38//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
39//! `gossip_bit_vec_frequency` in the [Config]) between connected peers. This serves as both a
40//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
41//! connection, especially during periods of low application-level traffic. The protocol supports
42//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
43//! `index`. This is useful, for instance, during transitions like distributed key generation
44//! (DKG) where connections to both old and new peer sets are needed simultaneously.
45//!
46//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
47//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
48//! selects a random subset of these known `PeerInfo` structures (up to `peer_gossip_max_count`)
49//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
50//! `PeerInfo` for peers that they currently have a connection with. This prevents them from
51//! repeatedly sending `PeerInfo` that they cannot verify is still valid. Each `PeerInfo` contains:
52//! - `socket`: The [std::net::SocketAddr] of the peer.
53//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
54//! - `public_key`: The peer's public key.
55//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
56//!
57//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
58//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
59//!
60//! If a peer receives a `PeerInfo` message (either directly or through gossip) containing a more
61//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
62//! `PeerInfo` is also used in future gossip messages. Each peer generates its own signed
63//! `PeerInfo` upon startup and sends it immediately after establishing a connection (following
64//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
65//! record, it will be corrected promptly by the peer being dialed.
66//!
67//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
68//! [Config]) - known peer public keys and their corresponding socket addresses. The peer
69//! attempts to dial these bootstrappers, performs the handshake, sends its own `PeerInfo`, and
70//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
71//! marked as '1'). It then waits for responses, learning about other peers through the
72//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
73//! them are maintained even if they aren't part of any currently tracked peer sets. Different
74//! peers can have different bootstrapper lists.
75//!
76//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
77//! belongs to a registered peer set, the listener will accept the connection, even if the
78//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
79//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
80//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
81//! with more up-to-date peer set information to connect and propagate that information, enabling
82//! the listener to potentially learn about newer sets it is part of._
83//!
84//! ## Messages
85//!
86//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
87//! internal `Data` structure. This structure contains:
88//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
89//! - `message`: The arbitrary application payload as `Bytes`.
90//!
91//! The size of the `message` bytes must not exceed the configured
92//! `max_message_size`. If it does, the sending operation will fail with
93//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
94//! communications to potentially bypass lower-priority messages waiting in send queues across all
95//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
96//! and rate limiting.
97//!
98//! ## Compression
99//!
100//! Stream compression is not provided at the transport layer to avoid inadvertently
101//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
102//! between compression and encryption by analyzing patterns in the resulting data.
103//! By compressing secrets alongside attacker-controlled content, these attacks can infer
104//! sensitive information through compression ratio analysis. Applications that choose
105//! to compress data should do so with full awareness of these risks and implement
106//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
107//! alongside sensitive information).
108//!
109//! # Example
110//!
111//! ```rust
112//! use commonware_p2p::{authenticated::discovery::{self, Network}, Sender, Recipients};
113//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
114//! use commonware_runtime::{tokio, Spawner, Runner, Metrics};
115//! use commonware_utils::NZU32;
116//! use governor::Quota;
117//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
118//!
119//! // Configure context
120//! let runtime_cfg = tokio::Config::default();
121//! let runner = tokio::Runner::new(runtime_cfg.clone());
122//!
123//! // Generate identity
124//! //
125//! // In production, the signer should be generated from a secure source of entropy.
126//! let signer = ed25519::PrivateKey::from_seed(0);
127//!
128//! // Generate peers
129//! //
130//! // In production, peer identities will be provided by some external source of truth
131//! // (like the staking set of a blockchain).
132//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
133//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
134//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
135//!
136//! // Configure bootstrappers
137//! //
138//! // In production, it is likely that the address of bootstrappers will be some public address.
139//! let bootstrappers = vec![(peer1.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001))];
140//!
141//! // Configure namespace
142//! //
143//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
144//! let application_namespace = b"my-app-namespace";
145//!
146//! // Configure network
147//! //
148//! // In production, use a more conservative configuration like `Config::recommended`.
149//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
150//! let p2p_cfg = discovery::Config::aggressive(
151//!     signer.clone(),
152//!     application_namespace,
153//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
154//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
155//!     bootstrappers,
156//!     MAX_MESSAGE_SIZE,
157//! );
158//!
159//! // Start context
160//! runner.start(|context| async move {
161//!     // Initialize network
162//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
163//!
164//!     // Register authorized peers
165//!     //
166//!     // In production, this would be updated as new peer sets are created (like when
167//!     // the composition of a validator set changes).
168//!     oracle.register(0, vec![signer.public_key(), peer1, peer2, peer3]).await;
169//!
170//!     // Register some channel
171//!     const MAX_MESSAGE_BACKLOG: usize = 128;
172//!     let (mut sender, receiver) = network.register(
173//!         0,
174//!         Quota::per_second(NZU32!(1)),
175//!         MAX_MESSAGE_BACKLOG,
176//!     );
177//!
178//!     // Run network
179//!     let network_handler = network.start();
180//!
181//!     // Example: Use sender
182//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
183//!
184//!     // Shutdown network
185//!     network_handler.abort();
186//! });
187//! ```
188
189mod actors;
190mod channels;
191mod config;
192mod metrics;
193mod network;
194mod types;
195
196use thiserror::Error;
197
198/// Errors that can occur when interacting with the network.
199#[derive(Error, Debug)]
200pub enum Error {
201    #[error("message too large: {0}")]
202    MessageTooLarge(usize),
203    #[error("network closed")]
204    NetworkClosed,
205}
206
207pub use actors::tracker::Oracle;
208pub use channels::{Receiver, Sender};
209pub use config::{Bootstrapper, Config};
210pub use network::Network;
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::{Receiver, Recipients, Sender};
216    use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
217    use commonware_macros::test_traced;
218    use commonware_runtime::{
219        deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
220    };
221    use commonware_utils::NZU32;
222    use futures::{channel::mpsc, SinkExt, StreamExt};
223    use governor::{clock::ReasonablyRealtime, Quota};
224    use rand::{CryptoRng, Rng};
225    use std::{
226        collections::HashSet,
227        net::{IpAddr, Ipv4Addr, SocketAddr},
228        time::Duration,
229    };
230
231    #[derive(Copy, Clone)]
232    enum Mode {
233        All,
234        Some,
235        One,
236    }
237
238    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
239
240    /// Ensure no message rate limiting occurred.
241    ///
242    /// If a message is rate limited, it would be formatted as:
243    ///
244    /// ```text
245    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
246    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
247    /// ```
248    fn assert_no_rate_limiting(context: &impl Metrics) {
249        let metrics = context.encode();
250        assert!(
251            !metrics.contains("messages_rate_limited_total{"),
252            "no messages should be rate limited: {metrics}"
253        );
254    }
255
256    /// Test connectivity between `n` peers.
257    ///
258    /// We set a unique `base_port` for each test to avoid "address already in use"
259    /// errors when tests are run immediately after each other.
260    async fn run_network(
261        context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
262        max_message_size: usize,
263        base_port: u16,
264        n: usize,
265        mode: Mode,
266    ) {
267        // Create peers
268        let mut peers = Vec::new();
269        for i in 0..n {
270            peers.push(ed25519::PrivateKey::from_seed(i as u64));
271        }
272        let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
273
274        // Create networks
275        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
276        for (i, peer) in peers.iter().enumerate() {
277            // Create peer context
278            let context = context.with_label(&format!("peer-{i}"));
279
280            // Derive port
281            let port = base_port + i as u16;
282
283            // Create bootstrappers
284            let mut bootstrappers = Vec::new();
285            if i > 0 {
286                bootstrappers.push((
287                    addresses[0].clone(),
288                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
289                ));
290            }
291
292            // Create network
293            let signer = peer.clone();
294            let config = Config::test(
295                signer.clone(),
296                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
297                bootstrappers,
298                max_message_size,
299            );
300            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
301
302            // Register peers
303            oracle.register(0, addresses.clone()).await;
304
305            // Register basic application
306            let (mut sender, mut receiver) =
307                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
308
309            // Wait to connect to all peers, and then send messages to everyone
310            network.start();
311
312            // Send/Receive messages
313            context.with_label("agent").spawn({
314                let mut complete_sender = complete_sender.clone();
315                let addresses = addresses.clone();
316                move |context| async move {
317                    // Wait for all peers to send their identity
318                    context.with_label("receiver").spawn(move |_| async move {
319                        // Wait for all peers to send their identity
320                        let mut received = HashSet::new();
321                        while received.len() < n - 1 {
322                            // Ensure message equals sender identity
323                            let (sender, message) = receiver.recv().await.unwrap();
324                            assert_eq!(sender.as_ref(), message.as_ref());
325
326                            // Add to received set
327                            received.insert(sender);
328                        }
329                        complete_sender.send(()).await.unwrap();
330
331                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
332                        loop {
333                            receiver.recv().await.unwrap();
334                        }
335                    });
336
337                    // Send identity to all peers
338                    let msg = signer.public_key();
339                    context
340                        .with_label("sender")
341                        .spawn(move |context| async move {
342                            // Loop forever to account for unexpected message drops
343                            loop {
344                                match mode {
345                                    Mode::One => {
346                                        for (j, recipient) in addresses.iter().enumerate() {
347                                            // Don't send message to self
348                                            if i == j {
349                                                continue;
350                                            }
351
352                                            // Loop until success
353                                            loop {
354                                                let sent = sender
355                                                    .send(
356                                                        Recipients::One(recipient.clone()),
357                                                        msg.to_vec().into(),
358                                                        true,
359                                                    )
360                                                    .await
361                                                    .unwrap();
362                                                if sent.len() != 1 {
363                                                    context.sleep(Duration::from_millis(100)).await;
364                                                    continue;
365                                                }
366                                                assert_eq!(&sent[0], recipient);
367                                                break;
368                                            }
369                                        }
370                                    }
371                                    Mode::Some => {
372                                        // Get all peers not including self
373                                        let mut recipients = addresses.clone();
374                                        recipients.remove(i);
375                                        recipients.sort();
376
377                                        // Loop until all peer sends successful
378                                        loop {
379                                            let mut sent = sender
380                                                .send(
381                                                    Recipients::Some(recipients.clone()),
382                                                    msg.to_vec().into(),
383                                                    true,
384                                                )
385                                                .await
386                                                .unwrap();
387                                            if sent.len() != n - 1 {
388                                                context.sleep(Duration::from_millis(100)).await;
389                                                continue;
390                                            }
391
392                                            // Compare to expected
393                                            sent.sort();
394                                            assert_eq!(sent, recipients);
395                                            break;
396                                        }
397                                    }
398                                    Mode::All => {
399                                        // Get all peers not including self
400                                        let mut recipients = addresses.clone();
401                                        recipients.remove(i);
402                                        recipients.sort();
403
404                                        // Loop until all peer sends successful
405                                        loop {
406                                            let mut sent = sender
407                                                .send(Recipients::All, msg.to_vec().into(), true)
408                                                .await
409                                                .unwrap();
410                                            if sent.len() != n - 1 {
411                                                context.sleep(Duration::from_millis(100)).await;
412                                                continue;
413                                            }
414
415                                            // Compare to expected
416                                            sent.sort();
417                                            assert_eq!(sent, recipients);
418                                            break;
419                                        }
420                                    }
421                                };
422
423                                // Sleep to avoid busy loop
424                                context.sleep(Duration::from_secs(10)).await;
425                            }
426                        });
427                }
428            });
429        }
430
431        // Wait for all peers to finish
432        for _ in 0..n {
433            complete_receiver.next().await.unwrap();
434        }
435
436        // Ensure no message rate limiting occurred
437        assert_no_rate_limiting(&context);
438    }
439
440    fn run_deterministic_test(seed: u64, mode: Mode) {
441        // Configure test
442        const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
443        const NUM_PEERS: usize = 25;
444        const BASE_PORT: u16 = 3000;
445
446        // Run first instance
447        let executor = deterministic::Runner::seeded(seed);
448        let state = executor.start(|context| async move {
449            run_network(
450                context.clone(),
451                MAX_MESSAGE_SIZE,
452                BASE_PORT,
453                NUM_PEERS,
454                mode,
455            )
456            .await;
457            context.auditor().state()
458        });
459
460        // Compare result to second instance
461        let executor = deterministic::Runner::seeded(seed);
462        let state2 = executor.start(|context| async move {
463            run_network(
464                context.clone(),
465                MAX_MESSAGE_SIZE,
466                BASE_PORT,
467                NUM_PEERS,
468                mode,
469            )
470            .await;
471            context.auditor().state()
472        });
473        assert_eq!(state, state2);
474    }
475
476    #[test_traced]
477    #[ignore]
478    fn test_determinism_one() {
479        for i in 0..10 {
480            run_deterministic_test(i, Mode::One);
481        }
482    }
483
484    #[test_traced]
485    #[ignore]
486    fn test_determinism_some() {
487        for i in 0..10 {
488            run_deterministic_test(i, Mode::Some);
489        }
490    }
491
492    #[test_traced]
493    #[ignore]
494    fn test_determinism_all() {
495        for i in 0..10 {
496            run_deterministic_test(i, Mode::All);
497        }
498    }
499
500    #[test_traced]
501    fn test_tokio_connectivity() {
502        let cfg = tokio::Config::default();
503        let executor = tokio::Runner::new(cfg.clone());
504        executor.start(|context| async move {
505            const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
506            let base_port = 3000;
507            let n = 10;
508            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
509        });
510    }
511
512    #[test_traced]
513    fn test_multi_index_oracle() {
514        // Configure test
515        let base_port = 3000;
516        let n: usize = 100;
517
518        // Initialize context
519        let executor = deterministic::Runner::default();
520        executor.start(|context| async move {
521            // Create peers
522            let mut peers = Vec::new();
523            for i in 0..n {
524                peers.push(ed25519::PrivateKey::from_seed(i as u64));
525            }
526            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
527
528            // Create networks
529            let mut waiters = Vec::new();
530            for (i, peer) in peers.iter().enumerate() {
531                // Create peer context
532                let context = context.with_label(&format!("peer-{i}"));
533
534                // Derive port
535                let port = base_port + i as u16;
536
537                // Create bootstrappers
538                let mut bootstrappers = Vec::new();
539                if i > 0 {
540                    bootstrappers.push((
541                        addresses[0].clone(),
542                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
543                    ));
544                }
545
546                // Create network
547                let signer = peer.clone();
548                let config = Config::test(
549                    signer.clone(),
550                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
551                    bootstrappers,
552                    1_024 * 1_024, // 1MB
553                );
554                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
555
556                // Register peers at separate indices
557                oracle.register(0, vec![addresses[0].clone()]).await;
558                oracle
559                    .register(1, vec![addresses[1].clone(), addresses[2].clone()])
560                    .await;
561                oracle
562                    .register(2, addresses.iter().skip(2).cloned().collect())
563                    .await;
564
565                // Register basic application
566                let (mut sender, mut receiver) =
567                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
568
569                // Wait to connect to all peers, and then send messages to everyone
570                network.start();
571
572                // Send/Receive messages
573                let handler = context
574                    .with_label("agent")
575                    .spawn(move |context| async move {
576                        if i == 0 {
577                            // Loop until success
578                            let msg = signer.public_key();
579                            loop {
580                                if sender
581                                    .send(Recipients::All, msg.to_vec().into(), true)
582                                    .await
583                                    .unwrap()
584                                    .len()
585                                    == n - 1
586                                {
587                                    break;
588                                }
589
590                                // Sleep and try again (avoid busy loop)
591                                context.sleep(Duration::from_millis(100)).await;
592                            }
593                        } else {
594                            // Ensure message equals sender identity
595                            let (sender, message) = receiver.recv().await.unwrap();
596                            assert_eq!(sender.as_ref(), message.as_ref());
597                        }
598                    });
599
600                // Add to waiters
601                waiters.push(handler);
602            }
603
604            // Wait for waiters to finish (receiver before sender)
605            for waiter in waiters.into_iter().rev() {
606                waiter.await.unwrap();
607            }
608
609            // Ensure no message rate limiting occurred
610            assert_no_rate_limiting(&context);
611        });
612    }
613
614    #[test_traced]
615    fn test_message_too_large() {
616        // Configure test
617        let base_port = 3000;
618        let n: usize = 2;
619
620        // Initialize context
621        let executor = deterministic::Runner::seeded(0);
622        executor.start(|mut context| async move {
623            // Create peers
624            let mut peers = Vec::new();
625            for i in 0..n {
626                peers.push(ed25519::PrivateKey::from_seed(i as u64));
627            }
628            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
629
630            // Create network
631            let signer = peers[0].clone();
632            let config = Config::test(
633                signer.clone(),
634                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
635                Vec::new(),
636                1_024 * 1_024, // 1MB
637            );
638            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
639
640            // Register peers
641            oracle.register(0, addresses.clone()).await;
642
643            // Register basic application
644            let (mut sender, _) =
645                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
646
647            // Wait to connect to all peers, and then send messages to everyone
648            network.start();
649
650            // Crate random message
651            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
652            context.fill(&mut msg[..]);
653
654            // Send message
655            let recipient = Recipients::One(addresses[1].clone());
656            let result = sender.send(recipient, msg.into(), true).await;
657            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
658        });
659    }
660
661    #[test_traced]
662    #[should_panic(expected = "no messages should be rate limited")]
663    fn test_rate_limiting() {
664        // Configure test
665        let base_port = 3000;
666        let n: usize = 2;
667
668        // Initialize context
669        let executor = deterministic::Runner::seeded(0);
670        executor.start(|context| async move {
671            // Create peers
672            let mut peers = Vec::new();
673            for i in 0..n {
674                peers.push(ed25519::PrivateKey::from_seed(i as u64));
675            }
676            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
677            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
678            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
679
680            // Create network for peer 0
681            let signer0 = peers[0].clone();
682            let config0 = Config::test(
683                signer0.clone(),
684                socket0,
685                vec![(peers[1].public_key(), socket1)],
686                1_024 * 1_024, // 1MB
687            );
688            let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
689            oracle0.register(0, addresses.clone()).await;
690            let (mut sender0, _receiver0) =
691                network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
692            network0.start();
693
694            // Create network for peer 1
695            let signer1 = peers[1].clone();
696            let config1 = Config::test(
697                signer1.clone(),
698                socket1,
699                vec![(peers[0].public_key(), socket0)],
700                1_024 * 1_024, // 1MB
701            );
702            let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
703            oracle1.register(0, addresses.clone()).await;
704            let (_sender1, _receiver1) =
705                network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
706            network1.start();
707
708            // Send first message, which should be allowed and consume the quota.
709            let msg = vec![0u8; 1024]; // 1KB
710            loop {
711                // Confirm message is sent to peer
712                let sent = sender0
713                    .send(
714                        Recipients::One(addresses[1].clone()),
715                        msg.clone().into(),
716                        true,
717                    )
718                    .await
719                    .unwrap();
720                if !sent.is_empty() {
721                    break;
722                }
723
724                // Sleep and try again (avoid busy loop)
725                context.sleep(Duration::from_millis(100)).await;
726            }
727
728            // Immediately send the second message to trigger the rate limit.
729            let sent = sender0
730                .send(Recipients::One(addresses[1].clone()), msg.into(), true)
731                .await
732                .unwrap();
733            assert!(!sent.is_empty());
734
735            // Loop until the metrics reflect the rate-limited message.
736            for _ in 0..10 {
737                assert_no_rate_limiting(&context);
738                context.sleep(Duration::from_millis(100)).await;
739            }
740        });
741    }
742}