commonware_p2p/authenticated/discovery/mod.rs
1//! Communicate with a fixed set of authenticated peers without known addresses over encrypted connections.
2//!
3//! `discovery` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//!
14//! # Design
15//!
16//! ## Discovery
17//!
18//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
19//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
20//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
21//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
22//! dialable addresses [std::net::SocketAddr] for the peers in that set.
23//! The `BitVec` message contains:
24//! - `index`: The `u64` index the bit vector applies to.
25//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
26//! peer's address in the sorted list for that index.
27//!
28//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
29//! peer a (configurable) number of times, they will assume that information is stale and mark it as
30//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
31//! information about other peers.
32//!
33//! _Warning: If peers are not synchronized on the peer set composition at a given index,
34//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
35//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
36//! application layer is responsible for ensuring peer set synchronization._
37//!
38//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
39//! `gossip_bit_vec_frequency` in the [Config]) between connected peers. This serves as both a
40//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
41//! connection, especially during periods of low application-level traffic. The protocol supports
42//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
43//! `index`. This is useful, for instance, during transitions like distributed key generation
44//! (DKG) where connections to both old and new peer sets are needed simultaneously.
45//!
46//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
47//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
48//! selects a random subset of these known `Info` structures (up to `peer_gossip_max_count`)
49//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
50//! `Info` for peers that they currently have a connection with. This prevents them from
51//! repeatedly sending `Info` that they cannot verify is still valid. Each `Info` contains:
52//! - `socket`: The [std::net::SocketAddr] of the peer.
53//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
54//! - `public_key`: The peer's public key.
55//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
56//!
57//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
58//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
59//!
60//! If a peer receives a `Info` message (either directly or through gossip) containing a more
61//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
62//! `Info` is also used in future gossip messages. Each peer generates its own signed
63//! `Info` upon startup and sends it immediately after establishing a connection (following
64//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
65//! record, it will be corrected promptly by the peer being dialed.
66//!
67//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
68//! [Config]) - known peer public keys and their corresponding socket addresses. The peer
69//! attempts to dial these bootstrappers, performs the handshake, sends its own `Info`, and
70//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
71//! marked as '1'). It then waits for responses, learning about other peers through the
72//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
73//! them are maintained even if they aren't part of any currently tracked peer sets. Different
74//! peers can have different bootstrapper lists.
75//!
76//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
77//! belongs to a registered peer set, the listener will accept the connection, even if the
78//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
79//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
80//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
81//! with more up-to-date peer set information to connect and propagate that information, enabling
82//! the listener to potentially learn about newer sets it is part of._
83//!
84//! ## Messages
85//!
86//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
87//! internal `Data` structure. This structure contains:
88//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
89//! - `message`: The arbitrary application payload as `Bytes`.
90//!
91//! The size of the `message` bytes must not exceed the configured
92//! `max_message_size`. If it does, the sending operation will fail with
93//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
94//! communications to potentially bypass lower-priority messages waiting in send queues across all
95//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
96//! and rate limiting.
97//!
98//! ## Compression
99//!
100//! Stream compression is not provided at the transport layer to avoid inadvertently
101//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
102//! between compression and encryption by analyzing patterns in the resulting data.
103//! By compressing secrets alongside attacker-controlled content, these attacks can infer
104//! sensitive information through compression ratio analysis. Applications that choose
105//! to compress data should do so with full awareness of these risks and implement
106//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
107//! alongside sensitive information).
108//!
109//! ## Rate Limiting
110//!
111//! There are five primary rate limits:
112//!
113//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
114//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
115//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
116//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
117//! - `rate` (per channel): The rate limit for messages sent on a single channel.
118//!
119//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
120//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
121//!
122//! # Example
123//!
124//! ```rust
125//! use commonware_p2p::{authenticated::discovery::{self, Network}, Manager, Sender, Recipients};
126//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
127//! use commonware_runtime::{deterministic, Spawner, Runner, Metrics};
128//! use commonware_utils::NZU32;
129//! use governor::Quota;
130//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
131//!
132//! // Configure context
133//! let runtime_cfg = deterministic::Config::default();
134//! let runner = deterministic::Runner::new(runtime_cfg.clone());
135//!
136//! // Generate identity
137//! //
138//! // In production, the signer should be generated from a secure source of entropy.
139//! let signer = ed25519::PrivateKey::from_seed(0);
140//!
141//! // Generate peers
142//! //
143//! // In production, peer identities will be provided by some external source of truth
144//! // (like the staking set of a blockchain).
145//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
146//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
147//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
148//!
149//! // Configure bootstrappers
150//! //
151//! // In production, it is likely that the address of bootstrappers will be some public address.
152//! let bootstrappers = vec![(peer1.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001))];
153//!
154//! // Configure namespace
155//! //
156//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
157//! let application_namespace = b"my-app-namespace";
158//!
159//! // Configure network
160//! //
161//! // In production, use a more conservative configuration like `Config::recommended`.
162//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
163//! let p2p_cfg = discovery::Config::local(
164//! signer.clone(),
165//! application_namespace,
166//! SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
167//! SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
168//! bootstrappers,
169//! MAX_MESSAGE_SIZE,
170//! );
171//!
172//! // Start context
173//! runner.start(|context| async move {
174//! // Initialize network
175//! let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
176//!
177//! // Register authorized peers
178//! //
179//! // In production, this would be updated as new peer sets are created (like when
180//! // the composition of a validator set changes).
181//! oracle.update(0, vec![signer.public_key(), peer1, peer2, peer3].into()).await;
182//!
183//! // Register some channel
184//! const MAX_MESSAGE_BACKLOG: usize = 128;
185//! let (mut sender, receiver) = network.register(
186//! 0,
187//! Quota::per_second(NZU32!(1)),
188//! MAX_MESSAGE_BACKLOG,
189//! );
190//!
191//! // Run network
192//! let network_handler = network.start();
193//!
194//! // Example: Use sender
195//! let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
196//!
197//! // Shutdown network
198//! network_handler.abort();
199//! });
200//! ```
201
202mod actors;
203mod channels;
204mod config;
205mod metrics;
206mod network;
207mod types;
208
209use thiserror::Error;
210
211/// Errors that can occur when interacting with the network.
212#[derive(Error, Debug)]
213pub enum Error {
214 #[error("message too large: {0}")]
215 MessageTooLarge(usize),
216 #[error("network closed")]
217 NetworkClosed,
218}
219
220pub use actors::tracker::Oracle;
221pub use channels::{Receiver, Sender};
222pub use config::{Bootstrapper, Config};
223pub use network::Network;
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use crate::{Manager, Receiver, Recipients, Sender};
229 use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
230 use commonware_macros::{select, test_traced};
231 use commonware_runtime::{
232 deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
233 };
234 use commonware_utils::{set::Ordered, NZU32};
235 use futures::{channel::mpsc, SinkExt, StreamExt};
236 use governor::{clock::ReasonablyRealtime, Quota};
237 use rand::{CryptoRng, Rng};
238 use std::{
239 collections::HashSet,
240 net::{IpAddr, Ipv4Addr, SocketAddr},
241 time::Duration,
242 };
243
244 #[derive(Copy, Clone)]
245 enum Mode {
246 All,
247 Some,
248 One,
249 }
250
251 const DEFAULT_MESSAGE_BACKLOG: usize = 128;
252
253 /// Ensure no message rate limiting occurred.
254 ///
255 /// If a message is rate limited, it would be formatted as:
256 ///
257 /// ```text
258 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
259 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
260 /// ```
261 fn assert_no_rate_limiting(context: &impl Metrics) {
262 let metrics = context.encode();
263 assert!(
264 !metrics.contains("messages_rate_limited_total{"),
265 "no messages should be rate limited: {metrics}"
266 );
267 }
268
269 /// Test connectivity between `n` peers.
270 ///
271 /// We set a unique `base_port` for each test to avoid "address already in use"
272 /// errors when tests are run immediately after each other.
273 async fn run_network(
274 context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
275 max_message_size: usize,
276 base_port: u16,
277 n: usize,
278 mode: Mode,
279 ) {
280 // Create peers
281 let mut peers = Vec::new();
282 for i in 0..n {
283 peers.push(ed25519::PrivateKey::from_seed(i as u64));
284 }
285 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
286
287 // Create networks
288 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
289 for (i, peer) in peers.iter().enumerate() {
290 // Create peer context
291 let context = context.with_label(&format!("peer-{i}"));
292
293 // Derive port
294 let port = base_port + i as u16;
295
296 // Create bootstrappers
297 let mut bootstrappers = Vec::new();
298 if i > 0 {
299 bootstrappers.push((
300 addresses[0].clone(),
301 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
302 ));
303 }
304
305 // Create network
306 let signer = peer.clone();
307 let config = Config::test(
308 signer.clone(),
309 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
310 bootstrappers,
311 max_message_size,
312 );
313 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
314
315 // Register peers
316 oracle.update(0, addresses.clone().into()).await;
317
318 // Register basic application
319 let (mut sender, mut receiver) =
320 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
321
322 // Wait to connect to all peers, and then send messages to everyone
323 network.start();
324
325 // Send/Receive messages
326 context.with_label("agent").spawn({
327 let mut complete_sender = complete_sender.clone();
328 let addresses = addresses.clone();
329 move |context| async move {
330 // Wait for all peers to send their identity
331 let receiver = context.with_label("receiver").spawn(move |_| async move {
332 // Wait for all peers to send their identity
333 let mut received = HashSet::new();
334 while received.len() < n - 1 {
335 // Ensure message equals sender identity
336 let (sender, message) = receiver.recv().await.unwrap();
337 assert_eq!(sender.as_ref(), message.as_ref());
338
339 // Add to received set
340 received.insert(sender);
341 }
342 complete_sender.send(()).await.unwrap();
343
344 // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
345 loop {
346 receiver.recv().await.unwrap();
347 }
348 });
349
350 // Send identity to all peers
351 let msg = signer.public_key();
352 let sender = context
353 .with_label("sender")
354 .spawn(move |context| async move {
355 // Loop forever to account for unexpected message drops
356 loop {
357 match mode {
358 Mode::One => {
359 for (j, recipient) in addresses.iter().enumerate() {
360 // Don't send message to self
361 if i == j {
362 continue;
363 }
364
365 // Loop until success
366 loop {
367 let sent = sender
368 .send(
369 Recipients::One(recipient.clone()),
370 msg.to_vec().into(),
371 true,
372 )
373 .await
374 .unwrap();
375 if sent.len() != 1 {
376 context.sleep(Duration::from_millis(100)).await;
377 continue;
378 }
379 assert_eq!(&sent[0], recipient);
380 break;
381 }
382 }
383 }
384 Mode::Some => {
385 // Get all peers not including self
386 let mut recipients = addresses.clone();
387 recipients.remove(i);
388 recipients.sort();
389
390 // Loop until all peer sends successful
391 loop {
392 let mut sent = sender
393 .send(
394 Recipients::Some(recipients.clone()),
395 msg.to_vec().into(),
396 true,
397 )
398 .await
399 .unwrap();
400 if sent.len() != n - 1 {
401 context.sleep(Duration::from_millis(100)).await;
402 continue;
403 }
404
405 // Compare to expected
406 sent.sort();
407 assert_eq!(sent, recipients);
408 break;
409 }
410 }
411 Mode::All => {
412 // Get all peers not including self
413 let mut recipients = addresses.clone();
414 recipients.remove(i);
415 recipients.sort();
416
417 // Loop until all peer sends successful
418 loop {
419 let mut sent = sender
420 .send(Recipients::All, msg.to_vec().into(), true)
421 .await
422 .unwrap();
423 if sent.len() != n - 1 {
424 context.sleep(Duration::from_millis(100)).await;
425 continue;
426 }
427
428 // Compare to expected
429 sent.sort();
430 assert_eq!(sent, recipients);
431 break;
432 }
433 }
434 };
435
436 // Sleep to avoid busy loop
437 context.sleep(Duration::from_secs(10)).await;
438 }
439 });
440
441 // Neither task should exit
442 select! {
443 receiver = receiver => {
444 panic!("receiver exited: {receiver:?}");
445 },
446 sender = sender => {
447 panic!("sender exited: {sender:?}");
448 },
449 }
450 }
451 });
452 }
453
454 // Wait for all peers to finish
455 for _ in 0..n {
456 complete_receiver.next().await.unwrap();
457 }
458
459 // Ensure no message rate limiting occurred
460 assert_no_rate_limiting(&context);
461 }
462
463 fn run_deterministic_test(seed: u64, mode: Mode) {
464 // Configure test
465 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
466 const NUM_PEERS: usize = 25;
467 const BASE_PORT: u16 = 3000;
468
469 // Run first instance
470 let executor = deterministic::Runner::seeded(seed);
471 let state = executor.start(|context| async move {
472 run_network(
473 context.clone(),
474 MAX_MESSAGE_SIZE,
475 BASE_PORT,
476 NUM_PEERS,
477 mode,
478 )
479 .await;
480 context.auditor().state()
481 });
482
483 // Compare result to second instance
484 let executor = deterministic::Runner::seeded(seed);
485 let state2 = executor.start(|context| async move {
486 run_network(
487 context.clone(),
488 MAX_MESSAGE_SIZE,
489 BASE_PORT,
490 NUM_PEERS,
491 mode,
492 )
493 .await;
494 context.auditor().state()
495 });
496 assert_eq!(state, state2);
497 }
498
499 #[test_traced]
500 #[ignore]
501 fn test_determinism_one() {
502 for i in 0..10 {
503 run_deterministic_test(i, Mode::One);
504 }
505 }
506
507 #[test_traced]
508 #[ignore]
509 fn test_determinism_some() {
510 for i in 0..10 {
511 run_deterministic_test(i, Mode::Some);
512 }
513 }
514
515 #[test_traced]
516 #[ignore]
517 fn test_determinism_all() {
518 for i in 0..10 {
519 run_deterministic_test(i, Mode::All);
520 }
521 }
522
523 #[test_traced]
524 fn test_tokio_connectivity() {
525 let executor = tokio::Runner::default();
526 executor.start(|context| async move {
527 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
528 let base_port = 3000;
529 let n = 10;
530 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
531 });
532 }
533
534 #[test_traced]
535 fn test_multi_index_oracle() {
536 // Configure test
537 let base_port = 3000;
538 let n: usize = 100;
539
540 // Initialize context
541 let executor = deterministic::Runner::default();
542 executor.start(|context| async move {
543 // Create peers
544 let mut peers = Vec::new();
545 for i in 0..n {
546 peers.push(ed25519::PrivateKey::from_seed(i as u64));
547 }
548 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
549
550 // Create networks
551 let mut waiters = Vec::new();
552 for (i, peer) in peers.iter().enumerate() {
553 // Create peer context
554 let context = context.with_label(&format!("peer-{i}"));
555
556 // Derive port
557 let port = base_port + i as u16;
558
559 // Create bootstrappers
560 let mut bootstrappers = Vec::new();
561 if i > 0 {
562 bootstrappers.push((
563 addresses[0].clone(),
564 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
565 ));
566 }
567
568 // Create network
569 let signer = peer.clone();
570 let config = Config::test(
571 signer.clone(),
572 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
573 bootstrappers,
574 1_024 * 1_024, // 1MB
575 );
576 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
577
578 // Register peers at separate indices
579 oracle
580 .update(0, Ordered::from([addresses[0].clone()]))
581 .await;
582 oracle
583 .update(
584 1,
585 Ordered::from([addresses[1].clone(), addresses[2].clone()]),
586 )
587 .await;
588 oracle
589 .update(2, addresses.iter().skip(2).cloned().collect())
590 .await;
591
592 // Register basic application
593 let (mut sender, mut receiver) =
594 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
595
596 // Wait to connect to all peers, and then send messages to everyone
597 network.start();
598
599 // Send/Receive messages
600 let handler = context
601 .with_label("agent")
602 .spawn(move |context| async move {
603 if i == 0 {
604 // Loop until success
605 let msg = signer.public_key();
606 loop {
607 if sender
608 .send(Recipients::All, msg.to_vec().into(), true)
609 .await
610 .unwrap()
611 .len()
612 == n - 1
613 {
614 break;
615 }
616
617 // Sleep and try again (avoid busy loop)
618 context.sleep(Duration::from_millis(100)).await;
619 }
620 } else {
621 // Ensure message equals sender identity
622 let (sender, message) = receiver.recv().await.unwrap();
623 assert_eq!(sender.as_ref(), message.as_ref());
624 }
625 });
626
627 // Add to waiters
628 waiters.push(handler);
629 }
630
631 // Wait for waiters to finish (receiver before sender)
632 for waiter in waiters.into_iter().rev() {
633 waiter.await.unwrap();
634 }
635
636 // Ensure no message rate limiting occurred
637 assert_no_rate_limiting(&context);
638 });
639 }
640
641 #[test_traced]
642 fn test_message_too_large() {
643 // Configure test
644 let base_port = 3000;
645 let n: usize = 2;
646
647 // Initialize context
648 let executor = deterministic::Runner::seeded(0);
649 executor.start(|mut context| async move {
650 // Create peers
651 let mut peers = Vec::new();
652 for i in 0..n {
653 peers.push(ed25519::PrivateKey::from_seed(i as u64));
654 }
655 let addresses = peers.iter().map(|p| p.public_key()).collect::<Ordered<_>>();
656
657 // Create network
658 let signer = peers[0].clone();
659 let config = Config::test(
660 signer.clone(),
661 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
662 Vec::new(),
663 1_024 * 1_024, // 1MB
664 );
665 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
666
667 // Register peers
668 oracle.update(0, addresses.clone()).await;
669
670 // Register basic application
671 let (mut sender, _) =
672 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
673
674 // Wait to connect to all peers, and then send messages to everyone
675 network.start();
676
677 // Crate random message
678 let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
679 context.fill(&mut msg[..]);
680
681 // Send message
682 let recipient = Recipients::One(addresses[1].clone());
683 let result = sender.send(recipient, msg.into(), true).await;
684 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
685 });
686 }
687
688 #[test_traced]
689 #[should_panic(expected = "no messages should be rate limited")]
690 fn test_rate_limiting() {
691 // Configure test
692 let base_port = 3000;
693 let n: usize = 2;
694
695 // Initialize context
696 let executor = deterministic::Runner::seeded(0);
697 executor.start(|context| async move {
698 // Create peers
699 let mut peers = Vec::new();
700 for i in 0..n {
701 peers.push(ed25519::PrivateKey::from_seed(i as u64));
702 }
703 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
704 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
705 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
706
707 // Create network for peer 0
708 let signer0 = peers[0].clone();
709 let config0 = Config::test(
710 signer0.clone(),
711 socket0,
712 vec![(peers[1].public_key(), socket1)],
713 1_024 * 1_024, // 1MB
714 );
715 let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
716 oracle0.update(0, addresses.clone().into()).await;
717 let (mut sender0, _receiver0) =
718 network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
719 network0.start();
720
721 // Create network for peer 1
722 let signer1 = peers[1].clone();
723 let config1 = Config::test(
724 signer1.clone(),
725 socket1,
726 vec![(peers[0].public_key(), socket0)],
727 1_024 * 1_024, // 1MB
728 );
729 let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
730 oracle1.update(0, addresses.clone().into()).await;
731 let (_sender1, _receiver1) =
732 network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
733 network1.start();
734
735 // Send first message, which should be allowed and consume the quota.
736 let msg = vec![0u8; 1024]; // 1KB
737 loop {
738 // Confirm message is sent to peer
739 let sent = sender0
740 .send(
741 Recipients::One(addresses[1].clone()),
742 msg.clone().into(),
743 true,
744 )
745 .await
746 .unwrap();
747 if !sent.is_empty() {
748 break;
749 }
750
751 // Sleep and try again (avoid busy loop)
752 context.sleep(Duration::from_millis(100)).await;
753 }
754
755 // Immediately send the second message to trigger the rate limit.
756 let sent = sender0
757 .send(Recipients::One(addresses[1].clone()), msg.into(), true)
758 .await
759 .unwrap();
760 assert!(!sent.is_empty());
761
762 // Loop until the metrics reflect the rate-limited message.
763 for _ in 0..10 {
764 assert_no_rate_limiting(&context);
765 context.sleep(Duration::from_millis(100)).await;
766 }
767 });
768 }
769}