commonware_p2p/authenticated/discovery/
mod.rs

1//! Communicate with a fixed set of authenticated peers without known addresses over encrypted connections.
2//!
3//! `discovery` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//!
14//! # Design
15//!
16//! ## Discovery
17//!
18//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
19//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
20//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
21//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
22//! dialable addresses [std::net::SocketAddr] for the peers in that set.
23//! The `BitVec` message contains:
24//! - `index`: The `u64` index the bit vector applies to.
25//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
26//!   peer's address in the sorted list for that index.
27//!
28//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
29//! peer a (configurable) number of times, they will assume that information is stale and mark it as
30//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
31//! information about other peers.
32//!
33//! _Warning: If peers are not synchronized on the peer set composition at a given index,
34//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
35//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
36//! application layer is responsible for ensuring peer set synchronization._
37//!
38//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
39//! `gossip_bit_vec_frequency` in the [Config]) between connected peers. This serves as both a
40//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
41//! connection, especially during periods of low application-level traffic. The protocol supports
42//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
43//! `index`. This is useful, for instance, during transitions like distributed key generation
44//! (DKG) where connections to both old and new peer sets are needed simultaneously.
45//!
46//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
47//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
48//! selects a random subset of these known `Info` structures (up to `peer_gossip_max_count`)
49//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
50//! `Info` for peers that they currently have a connection with. This prevents them from
51//! repeatedly sending `Info` that they cannot verify is still valid. Each `Info` contains:
52//! - `socket`: The [std::net::SocketAddr] of the peer.
53//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
54//! - `public_key`: The peer's public key.
55//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
56//!
57//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
58//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
59//!
60//! If a peer receives a `Info` message (either directly or through gossip) containing a more
61//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
62//! `Info` is also used in future gossip messages. Each peer generates its own signed
63//! `Info` upon startup and sends it immediately after establishing a connection (following
64//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
65//! record, it will be corrected promptly by the peer being dialed.
66//!
67//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
68//! [Config]) - known peer public keys and their corresponding socket addresses. The peer
69//! attempts to dial these bootstrappers, performs the handshake, sends its own `Info`, and
70//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
71//! marked as '1'). It then waits for responses, learning about other peers through the
72//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
73//! them are maintained even if they aren't part of any currently tracked peer sets. Different
74//! peers can have different bootstrapper lists.
75//!
76//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
77//! belongs to a registered peer set, the listener will accept the connection, even if the
78//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
79//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
80//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
81//! with more up-to-date peer set information to connect and propagate that information, enabling
82//! the listener to potentially learn about newer sets it is part of._
83//!
84//! ## Messages
85//!
86//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
87//! internal `Data` structure. This structure contains:
88//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
89//! - `message`: The arbitrary application payload as `Bytes`.
90//!
91//! The size of the `message` bytes must not exceed the configured
92//! `max_message_size`. If it does, the sending operation will fail with
93//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
94//! communications to potentially bypass lower-priority messages waiting in send queues across all
95//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
96//! and rate limiting.
97//!
98//! ## Compression
99//!
100//! Stream compression is not provided at the transport layer to avoid inadvertently
101//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
102//! between compression and encryption by analyzing patterns in the resulting data.
103//! By compressing secrets alongside attacker-controlled content, these attacks can infer
104//! sensitive information through compression ratio analysis. Applications that choose
105//! to compress data should do so with full awareness of these risks and implement
106//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
107//! alongside sensitive information).
108//!
109//! ## Rate Limiting
110//!
111//! There are five primary rate limits:
112//!
113//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
114//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
115//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
116//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
117//! - `rate` (per channel): The rate limit for messages sent on a single channel.
118//!
119//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
120//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
121//!
122//! # Example
123//!
124//! ```rust
125//! use commonware_p2p::{authenticated::discovery::{self, Network}, Manager, Sender, Recipients};
126//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
127//! use commonware_runtime::{deterministic, Spawner, Runner, Metrics};
128//! use commonware_utils::NZU32;
129//! use governor::Quota;
130//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
131//!
132//! // Configure context
133//! let runtime_cfg = deterministic::Config::default();
134//! let runner = deterministic::Runner::new(runtime_cfg.clone());
135//!
136//! // Generate identity
137//! //
138//! // In production, the signer should be generated from a secure source of entropy.
139//! let signer = ed25519::PrivateKey::from_seed(0);
140//!
141//! // Generate peers
142//! //
143//! // In production, peer identities will be provided by some external source of truth
144//! // (like the staking set of a blockchain).
145//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
146//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
147//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
148//!
149//! // Configure bootstrappers
150//! //
151//! // In production, it is likely that the address of bootstrappers will be some public address.
152//! let bootstrappers = vec![(peer1.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001))];
153//!
154//! // Configure namespace
155//! //
156//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
157//! let application_namespace = b"my-app-namespace";
158//!
159//! // Configure network
160//! //
161//! // In production, use a more conservative configuration like `Config::recommended`.
162//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
163//! let p2p_cfg = discovery::Config::local(
164//!     signer.clone(),
165//!     application_namespace,
166//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
167//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
168//!     bootstrappers,
169//!     MAX_MESSAGE_SIZE,
170//! );
171//!
172//! // Start context
173//! runner.start(|context| async move {
174//!     // Initialize network
175//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
176//!
177//!     // Register authorized peers
178//!     //
179//!     // In production, this would be updated as new peer sets are created (like when
180//!     // the composition of a validator set changes).
181//!     oracle.update(0, vec![signer.public_key(), peer1, peer2, peer3].into()).await;
182//!
183//!     // Register some channel
184//!     const MAX_MESSAGE_BACKLOG: usize = 128;
185//!     let (mut sender, receiver) = network.register(
186//!         0,
187//!         Quota::per_second(NZU32!(1)),
188//!         MAX_MESSAGE_BACKLOG,
189//!     );
190//!
191//!     // Run network
192//!     let network_handler = network.start();
193//!
194//!     // Example: Use sender
195//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
196//!
197//!     // Shutdown network
198//!     network_handler.abort();
199//! });
200//! ```
201
202mod actors;
203mod channels;
204mod config;
205mod metrics;
206mod network;
207mod types;
208
209use thiserror::Error;
210
211/// Errors that can occur when interacting with the network.
212#[derive(Error, Debug)]
213pub enum Error {
214    #[error("message too large: {0}")]
215    MessageTooLarge(usize),
216    #[error("network closed")]
217    NetworkClosed,
218}
219
220pub use actors::tracker::Oracle;
221pub use channels::{Receiver, Sender};
222pub use config::{Bootstrapper, Config};
223pub use network::Network;
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use crate::{Manager, Receiver, Recipients, Sender};
229    use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
230    use commonware_macros::{select, test_traced};
231    use commonware_runtime::{
232        deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
233    };
234    use commonware_utils::{set::Ordered, NZU32};
235    use futures::{channel::mpsc, SinkExt, StreamExt};
236    use governor::{clock::ReasonablyRealtime, Quota};
237    use rand::{CryptoRng, Rng};
238    use std::{
239        collections::HashSet,
240        net::{IpAddr, Ipv4Addr, SocketAddr},
241        time::Duration,
242    };
243
244    #[derive(Copy, Clone)]
245    enum Mode {
246        All,
247        Some,
248        One,
249    }
250
251    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
252
253    /// Ensure no message rate limiting occurred.
254    ///
255    /// If a message is rate limited, it would be formatted as:
256    ///
257    /// ```text
258    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
259    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
260    /// ```
261    fn assert_no_rate_limiting(context: &impl Metrics) {
262        let metrics = context.encode();
263        assert!(
264            !metrics.contains("messages_rate_limited_total{"),
265            "no messages should be rate limited: {metrics}"
266        );
267    }
268
269    /// Test connectivity between `n` peers.
270    ///
271    /// We set a unique `base_port` for each test to avoid "address already in use"
272    /// errors when tests are run immediately after each other.
273    async fn run_network(
274        context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
275        max_message_size: usize,
276        base_port: u16,
277        n: usize,
278        mode: Mode,
279    ) {
280        // Create peers
281        let mut peers = Vec::new();
282        for i in 0..n {
283            peers.push(ed25519::PrivateKey::from_seed(i as u64));
284        }
285        let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
286
287        // Create networks
288        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
289        for (i, peer) in peers.iter().enumerate() {
290            // Create peer context
291            let context = context.with_label(&format!("peer-{i}"));
292
293            // Derive port
294            let port = base_port + i as u16;
295
296            // Create bootstrappers
297            let mut bootstrappers = Vec::new();
298            if i > 0 {
299                bootstrappers.push((
300                    addresses[0].clone(),
301                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
302                ));
303            }
304
305            // Create network
306            let signer = peer.clone();
307            let config = Config::test(
308                signer.clone(),
309                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
310                bootstrappers,
311                max_message_size,
312            );
313            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
314
315            // Register peers
316            oracle.update(0, addresses.clone().into()).await;
317
318            // Register basic application
319            let (mut sender, mut receiver) =
320                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
321
322            // Wait to connect to all peers, and then send messages to everyone
323            network.start();
324
325            // Send/Receive messages
326            context.with_label("agent").spawn({
327                let mut complete_sender = complete_sender.clone();
328                let addresses = addresses.clone();
329                move |context| async move {
330                    // Wait for all peers to send their identity
331                    let receiver = context.with_label("receiver").spawn(move |_| async move {
332                        // Wait for all peers to send their identity
333                        let mut received = HashSet::new();
334                        while received.len() < n - 1 {
335                            // Ensure message equals sender identity
336                            let (sender, message) = receiver.recv().await.unwrap();
337                            assert_eq!(sender.as_ref(), message.as_ref());
338
339                            // Add to received set
340                            received.insert(sender);
341                        }
342                        complete_sender.send(()).await.unwrap();
343
344                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
345                        loop {
346                            receiver.recv().await.unwrap();
347                        }
348                    });
349
350                    // Send identity to all peers
351                    let msg = signer.public_key();
352                    let sender = context
353                        .with_label("sender")
354                        .spawn(move |context| async move {
355                            // Loop forever to account for unexpected message drops
356                            loop {
357                                match mode {
358                                    Mode::One => {
359                                        for (j, recipient) in addresses.iter().enumerate() {
360                                            // Don't send message to self
361                                            if i == j {
362                                                continue;
363                                            }
364
365                                            // Loop until success
366                                            loop {
367                                                let sent = sender
368                                                    .send(
369                                                        Recipients::One(recipient.clone()),
370                                                        msg.to_vec().into(),
371                                                        true,
372                                                    )
373                                                    .await
374                                                    .unwrap();
375                                                if sent.len() != 1 {
376                                                    context.sleep(Duration::from_millis(100)).await;
377                                                    continue;
378                                                }
379                                                assert_eq!(&sent[0], recipient);
380                                                break;
381                                            }
382                                        }
383                                    }
384                                    Mode::Some => {
385                                        // Get all peers not including self
386                                        let mut recipients = addresses.clone();
387                                        recipients.remove(i);
388                                        recipients.sort();
389
390                                        // Loop until all peer sends successful
391                                        loop {
392                                            let mut sent = sender
393                                                .send(
394                                                    Recipients::Some(recipients.clone()),
395                                                    msg.to_vec().into(),
396                                                    true,
397                                                )
398                                                .await
399                                                .unwrap();
400                                            if sent.len() != n - 1 {
401                                                context.sleep(Duration::from_millis(100)).await;
402                                                continue;
403                                            }
404
405                                            // Compare to expected
406                                            sent.sort();
407                                            assert_eq!(sent, recipients);
408                                            break;
409                                        }
410                                    }
411                                    Mode::All => {
412                                        // Get all peers not including self
413                                        let mut recipients = addresses.clone();
414                                        recipients.remove(i);
415                                        recipients.sort();
416
417                                        // Loop until all peer sends successful
418                                        loop {
419                                            let mut sent = sender
420                                                .send(Recipients::All, msg.to_vec().into(), true)
421                                                .await
422                                                .unwrap();
423                                            if sent.len() != n - 1 {
424                                                context.sleep(Duration::from_millis(100)).await;
425                                                continue;
426                                            }
427
428                                            // Compare to expected
429                                            sent.sort();
430                                            assert_eq!(sent, recipients);
431                                            break;
432                                        }
433                                    }
434                                };
435
436                                // Sleep to avoid busy loop
437                                context.sleep(Duration::from_secs(10)).await;
438                            }
439                        });
440
441                    // Neither task should exit
442                    select! {
443                        receiver = receiver => {
444                            panic!("receiver exited: {receiver:?}");
445                        },
446                        sender = sender => {
447                            panic!("sender exited: {sender:?}");
448                        },
449                    }
450                }
451            });
452        }
453
454        // Wait for all peers to finish
455        for _ in 0..n {
456            complete_receiver.next().await.unwrap();
457        }
458
459        // Ensure no message rate limiting occurred
460        assert_no_rate_limiting(&context);
461    }
462
463    fn run_deterministic_test(seed: u64, mode: Mode) {
464        // Configure test
465        const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
466        const NUM_PEERS: usize = 25;
467        const BASE_PORT: u16 = 3000;
468
469        // Run first instance
470        let executor = deterministic::Runner::seeded(seed);
471        let state = executor.start(|context| async move {
472            run_network(
473                context.clone(),
474                MAX_MESSAGE_SIZE,
475                BASE_PORT,
476                NUM_PEERS,
477                mode,
478            )
479            .await;
480            context.auditor().state()
481        });
482
483        // Compare result to second instance
484        let executor = deterministic::Runner::seeded(seed);
485        let state2 = executor.start(|context| async move {
486            run_network(
487                context.clone(),
488                MAX_MESSAGE_SIZE,
489                BASE_PORT,
490                NUM_PEERS,
491                mode,
492            )
493            .await;
494            context.auditor().state()
495        });
496        assert_eq!(state, state2);
497    }
498
499    #[test_traced]
500    #[ignore]
501    fn test_determinism_one() {
502        for i in 0..10 {
503            run_deterministic_test(i, Mode::One);
504        }
505    }
506
507    #[test_traced]
508    #[ignore]
509    fn test_determinism_some() {
510        for i in 0..10 {
511            run_deterministic_test(i, Mode::Some);
512        }
513    }
514
515    #[test_traced]
516    #[ignore]
517    fn test_determinism_all() {
518        for i in 0..10 {
519            run_deterministic_test(i, Mode::All);
520        }
521    }
522
523    #[test_traced]
524    fn test_tokio_connectivity() {
525        let executor = tokio::Runner::default();
526        executor.start(|context| async move {
527            const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
528            let base_port = 3000;
529            let n = 10;
530            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
531        });
532    }
533
534    #[test_traced]
535    fn test_multi_index_oracle() {
536        // Configure test
537        let base_port = 3000;
538        let n: usize = 100;
539
540        // Initialize context
541        let executor = deterministic::Runner::default();
542        executor.start(|context| async move {
543            // Create peers
544            let mut peers = Vec::new();
545            for i in 0..n {
546                peers.push(ed25519::PrivateKey::from_seed(i as u64));
547            }
548            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
549
550            // Create networks
551            let mut waiters = Vec::new();
552            for (i, peer) in peers.iter().enumerate() {
553                // Create peer context
554                let context = context.with_label(&format!("peer-{i}"));
555
556                // Derive port
557                let port = base_port + i as u16;
558
559                // Create bootstrappers
560                let mut bootstrappers = Vec::new();
561                if i > 0 {
562                    bootstrappers.push((
563                        addresses[0].clone(),
564                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
565                    ));
566                }
567
568                // Create network
569                let signer = peer.clone();
570                let config = Config::test(
571                    signer.clone(),
572                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
573                    bootstrappers,
574                    1_024 * 1_024, // 1MB
575                );
576                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
577
578                // Register peers at separate indices
579                oracle
580                    .update(0, Ordered::from([addresses[0].clone()]))
581                    .await;
582                oracle
583                    .update(
584                        1,
585                        Ordered::from([addresses[1].clone(), addresses[2].clone()]),
586                    )
587                    .await;
588                oracle
589                    .update(2, addresses.iter().skip(2).cloned().collect())
590                    .await;
591
592                // Register basic application
593                let (mut sender, mut receiver) =
594                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
595
596                // Wait to connect to all peers, and then send messages to everyone
597                network.start();
598
599                // Send/Receive messages
600                let handler = context
601                    .with_label("agent")
602                    .spawn(move |context| async move {
603                        if i == 0 {
604                            // Loop until success
605                            let msg = signer.public_key();
606                            loop {
607                                if sender
608                                    .send(Recipients::All, msg.to_vec().into(), true)
609                                    .await
610                                    .unwrap()
611                                    .len()
612                                    == n - 1
613                                {
614                                    break;
615                                }
616
617                                // Sleep and try again (avoid busy loop)
618                                context.sleep(Duration::from_millis(100)).await;
619                            }
620                        } else {
621                            // Ensure message equals sender identity
622                            let (sender, message) = receiver.recv().await.unwrap();
623                            assert_eq!(sender.as_ref(), message.as_ref());
624                        }
625                    });
626
627                // Add to waiters
628                waiters.push(handler);
629            }
630
631            // Wait for waiters to finish (receiver before sender)
632            for waiter in waiters.into_iter().rev() {
633                waiter.await.unwrap();
634            }
635
636            // Ensure no message rate limiting occurred
637            assert_no_rate_limiting(&context);
638        });
639    }
640
641    #[test_traced]
642    fn test_message_too_large() {
643        // Configure test
644        let base_port = 3000;
645        let n: usize = 2;
646
647        // Initialize context
648        let executor = deterministic::Runner::seeded(0);
649        executor.start(|mut context| async move {
650            // Create peers
651            let mut peers = Vec::new();
652            for i in 0..n {
653                peers.push(ed25519::PrivateKey::from_seed(i as u64));
654            }
655            let addresses = peers.iter().map(|p| p.public_key()).collect::<Ordered<_>>();
656
657            // Create network
658            let signer = peers[0].clone();
659            let config = Config::test(
660                signer.clone(),
661                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
662                Vec::new(),
663                1_024 * 1_024, // 1MB
664            );
665            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
666
667            // Register peers
668            oracle.update(0, addresses.clone()).await;
669
670            // Register basic application
671            let (mut sender, _) =
672                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
673
674            // Wait to connect to all peers, and then send messages to everyone
675            network.start();
676
677            // Crate random message
678            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
679            context.fill(&mut msg[..]);
680
681            // Send message
682            let recipient = Recipients::One(addresses[1].clone());
683            let result = sender.send(recipient, msg.into(), true).await;
684            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
685        });
686    }
687
688    #[test_traced]
689    #[should_panic(expected = "no messages should be rate limited")]
690    fn test_rate_limiting() {
691        // Configure test
692        let base_port = 3000;
693        let n: usize = 2;
694
695        // Initialize context
696        let executor = deterministic::Runner::seeded(0);
697        executor.start(|context| async move {
698            // Create peers
699            let mut peers = Vec::new();
700            for i in 0..n {
701                peers.push(ed25519::PrivateKey::from_seed(i as u64));
702            }
703            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
704            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
705            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
706
707            // Create network for peer 0
708            let signer0 = peers[0].clone();
709            let config0 = Config::test(
710                signer0.clone(),
711                socket0,
712                vec![(peers[1].public_key(), socket1)],
713                1_024 * 1_024, // 1MB
714            );
715            let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
716            oracle0.update(0, addresses.clone().into()).await;
717            let (mut sender0, _receiver0) =
718                network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
719            network0.start();
720
721            // Create network for peer 1
722            let signer1 = peers[1].clone();
723            let config1 = Config::test(
724                signer1.clone(),
725                socket1,
726                vec![(peers[0].public_key(), socket0)],
727                1_024 * 1_024, // 1MB
728            );
729            let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
730            oracle1.update(0, addresses.clone().into()).await;
731            let (_sender1, _receiver1) =
732                network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
733            network1.start();
734
735            // Send first message, which should be allowed and consume the quota.
736            let msg = vec![0u8; 1024]; // 1KB
737            loop {
738                // Confirm message is sent to peer
739                let sent = sender0
740                    .send(
741                        Recipients::One(addresses[1].clone()),
742                        msg.clone().into(),
743                        true,
744                    )
745                    .await
746                    .unwrap();
747                if !sent.is_empty() {
748                    break;
749                }
750
751                // Sleep and try again (avoid busy loop)
752                context.sleep(Duration::from_millis(100)).await;
753            }
754
755            // Immediately send the second message to trigger the rate limit.
756            let sent = sender0
757                .send(Recipients::One(addresses[1].clone()), msg.into(), true)
758                .await
759                .unwrap();
760            assert!(!sent.is_empty());
761
762            // Loop until the metrics reflect the rate-limited message.
763            for _ in 0..10 {
764                assert_no_rate_limiting(&context);
765                context.sleep(Duration::from_millis(100)).await;
766            }
767        });
768    }
769}