commonware_p2p/authenticated/discovery/mod.rs
1//! Communicate with a fixed set of authenticated peers without known addresses over encrypted connections.
2//!
3//! `discovery` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//!
14//! # Design
15//!
16//! ## Discovery
17//!
18//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
19//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
20//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
21//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
22//! dialable addresses [std::net::SocketAddr] for the peers in that set.
23//! The `BitVec` message contains:
24//! - `index`: The `u64` index the bit vector applies to.
25//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
26//! peer's address in the sorted list for that index.
27//!
28//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
29//! peer a (configurable) number of times, they will assume that information is stale and mark it as
30//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
31//! information about other peers.
32//!
33//! _Warning: If peers are not synchronized on the peer set composition at a given index,
34//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
35//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
36//! application layer is responsible for ensuring peer set synchronization._
37//!
38//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
39//! `gossip_bit_vec_frequency` in the [Config]) between connected peers. This serves as both a
40//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
41//! connection, especially during periods of low application-level traffic. The protocol supports
42//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
43//! `index`. This is useful, for instance, during transitions like distributed key generation
44//! (DKG) where connections to both old and new peer sets are needed simultaneously.
45//!
46//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
47//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
48//! selects a random subset of these known `PeerInfo` structures (up to `peer_gossip_max_count`)
49//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
50//! `PeerInfo` for peers that they currently have a connection with. This prevents them from
51//! repeatedly sending `PeerInfo` that they cannot verify is still valid. Each `PeerInfo` contains:
52//! - `socket`: The [std::net::SocketAddr] of the peer.
53//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
54//! - `public_key`: The peer's public key.
55//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
56//!
57//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
58//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
59//!
60//! If a peer receives a `PeerInfo` message (either directly or through gossip) containing a more
61//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
62//! `PeerInfo` is also used in future gossip messages. Each peer generates its own signed
63//! `PeerInfo` upon startup and sends it immediately after establishing a connection (following
64//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
65//! record, it will be corrected promptly by the peer being dialed.
66//!
67//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
68//! [Config]) - known peer public keys and their corresponding socket addresses. The peer
69//! attempts to dial these bootstrappers, performs the handshake, sends its own `PeerInfo`, and
70//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
71//! marked as '1'). It then waits for responses, learning about other peers through the
72//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
73//! them are maintained even if they aren't part of any currently tracked peer sets. Different
74//! peers can have different bootstrapper lists.
75//!
76//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
77//! belongs to a registered peer set, the listener will accept the connection, even if the
78//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
79//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
80//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
81//! with more up-to-date peer set information to connect and propagate that information, enabling
82//! the listener to potentially learn about newer sets it is part of._
83//!
84//! ## Messages
85//!
86//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
87//! internal `Data` structure. This structure contains:
88//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
89//! - `message`: The arbitrary application payload as `Bytes`.
90//!
91//! The size of the `message` bytes must not exceed the configured
92//! `max_message_size`. If it does, the sending operation will fail with
93//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
94//! communications to potentially bypass lower-priority messages waiting in send queues across all
95//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
96//! and rate limiting.
97//!
98//! ## Compression
99//!
100//! Stream compression is not provided at the transport layer to avoid inadvertently
101//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
102//! between compression and encryption by analyzing patterns in the resulting data.
103//! By compressing secrets alongside attacker-controlled content, these attacks can infer
104//! sensitive information through compression ratio analysis. Applications that choose
105//! to compress data should do so with full awareness of these risks and implement
106//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
107//! alongside sensitive information).
108//!
109//! # Example
110//!
111//! ```rust
112//! use commonware_p2p::{authenticated::discovery::{self, Network}, Sender, Recipients};
113//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
114//! use commonware_runtime::{tokio, Spawner, Runner, Metrics};
115//! use commonware_utils::NZU32;
116//! use governor::Quota;
117//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
118//!
119//! // Configure context
120//! let runtime_cfg = tokio::Config::default();
121//! let runner = tokio::Runner::new(runtime_cfg.clone());
122//!
123//! // Generate identity
124//! //
125//! // In production, the signer should be generated from a secure source of entropy.
126//! let signer = ed25519::PrivateKey::from_seed(0);
127//!
128//! // Generate peers
129//! //
130//! // In production, peer identities will be provided by some external source of truth
131//! // (like the staking set of a blockchain).
132//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
133//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
134//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
135//!
136//! // Configure bootstrappers
137//! //
138//! // In production, it is likely that the address of bootstrappers will be some public address.
139//! let bootstrappers = vec![(peer1.clone(), SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001))];
140//!
141//! // Configure namespace
142//! //
143//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
144//! let application_namespace = b"my-app-namespace";
145//!
146//! // Configure network
147//! //
148//! // In production, use a more conservative configuration like `Config::recommended`.
149//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
150//! let p2p_cfg = discovery::Config::aggressive(
151//! signer.clone(),
152//! application_namespace,
153//! SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
154//! SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
155//! bootstrappers,
156//! MAX_MESSAGE_SIZE,
157//! );
158//!
159//! // Start context
160//! runner.start(|context| async move {
161//! // Initialize network
162//! let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
163//!
164//! // Register authorized peers
165//! //
166//! // In production, this would be updated as new peer sets are created (like when
167//! // the composition of a validator set changes).
168//! oracle.register(0, vec![signer.public_key(), peer1, peer2, peer3]).await;
169//!
170//! // Register some channel
171//! const MAX_MESSAGE_BACKLOG: usize = 128;
172//! let (mut sender, receiver) = network.register(
173//! 0,
174//! Quota::per_second(NZU32!(1)),
175//! MAX_MESSAGE_BACKLOG,
176//! );
177//!
178//! // Run network
179//! let network_handler = network.start();
180//!
181//! // Example: Use sender
182//! let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
183//!
184//! // Shutdown network
185//! network_handler.abort();
186//! });
187//! ```
188
189mod actors;
190mod channels;
191mod config;
192mod metrics;
193mod network;
194mod types;
195
196use thiserror::Error;
197
198/// Errors that can occur when interacting with the network.
199#[derive(Error, Debug)]
200pub enum Error {
201 #[error("message too large: {0}")]
202 MessageTooLarge(usize),
203 #[error("network closed")]
204 NetworkClosed,
205}
206
207pub use actors::tracker::Oracle;
208pub use channels::{Receiver, Sender};
209pub use config::{Bootstrapper, Config};
210pub use network::Network;
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::{Receiver, Recipients, Sender};
216 use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
217 use commonware_macros::test_traced;
218 use commonware_runtime::{
219 deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
220 };
221 use commonware_utils::NZU32;
222 use futures::{channel::mpsc, SinkExt, StreamExt};
223 use governor::{clock::ReasonablyRealtime, Quota};
224 use rand::{CryptoRng, Rng};
225 use std::{
226 collections::HashSet,
227 net::{IpAddr, Ipv4Addr, SocketAddr},
228 time::Duration,
229 };
230
231 #[derive(Copy, Clone)]
232 enum Mode {
233 All,
234 Some,
235 One,
236 }
237
238 const DEFAULT_MESSAGE_BACKLOG: usize = 128;
239
240 /// Ensure no message rate limiting occurred.
241 ///
242 /// If a message is rate limited, it would be formatted as:
243 ///
244 /// ```text
245 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
246 /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
247 /// ```
248 fn assert_no_rate_limiting(context: &impl Metrics) {
249 let metrics = context.encode();
250 assert!(
251 !metrics.contains("messages_rate_limited_total{"),
252 "no messages should be rate limited: {metrics}"
253 );
254 }
255
256 /// Test connectivity between `n` peers.
257 ///
258 /// We set a unique `base_port` for each test to avoid "address already in use"
259 /// errors when tests are run immediately after each other.
260 async fn run_network(
261 context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
262 max_message_size: usize,
263 base_port: u16,
264 n: usize,
265 mode: Mode,
266 ) {
267 // Create peers
268 let mut peers = Vec::new();
269 for i in 0..n {
270 peers.push(ed25519::PrivateKey::from_seed(i as u64));
271 }
272 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
273
274 // Create networks
275 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
276 for (i, peer) in peers.iter().enumerate() {
277 // Create peer context
278 let context = context.with_label(&format!("peer-{i}"));
279
280 // Derive port
281 let port = base_port + i as u16;
282
283 // Create bootstrappers
284 let mut bootstrappers = Vec::new();
285 if i > 0 {
286 bootstrappers.push((
287 addresses[0].clone(),
288 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
289 ));
290 }
291
292 // Create network
293 let signer = peer.clone();
294 let config = Config::test(
295 signer.clone(),
296 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
297 bootstrappers,
298 max_message_size,
299 );
300 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
301
302 // Register peers
303 oracle.register(0, addresses.clone()).await;
304
305 // Register basic application
306 let (mut sender, mut receiver) =
307 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
308
309 // Wait to connect to all peers, and then send messages to everyone
310 network.start();
311
312 // Send/Receive messages
313 context.with_label("agent").spawn({
314 let mut complete_sender = complete_sender.clone();
315 let addresses = addresses.clone();
316 move |context| async move {
317 // Wait for all peers to send their identity
318 context.with_label("receiver").spawn(move |_| async move {
319 // Wait for all peers to send their identity
320 let mut received = HashSet::new();
321 while received.len() < n - 1 {
322 // Ensure message equals sender identity
323 let (sender, message) = receiver.recv().await.unwrap();
324 assert_eq!(sender.as_ref(), message.as_ref());
325
326 // Add to received set
327 received.insert(sender);
328 }
329 complete_sender.send(()).await.unwrap();
330
331 // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
332 loop {
333 receiver.recv().await.unwrap();
334 }
335 });
336
337 // Send identity to all peers
338 let msg = signer.public_key();
339 context
340 .with_label("sender")
341 .spawn(move |context| async move {
342 // Loop forever to account for unexpected message drops
343 loop {
344 match mode {
345 Mode::One => {
346 for (j, recipient) in addresses.iter().enumerate() {
347 // Don't send message to self
348 if i == j {
349 continue;
350 }
351
352 // Loop until success
353 loop {
354 let sent = sender
355 .send(
356 Recipients::One(recipient.clone()),
357 msg.to_vec().into(),
358 true,
359 )
360 .await
361 .unwrap();
362 if sent.len() != 1 {
363 context.sleep(Duration::from_millis(100)).await;
364 continue;
365 }
366 assert_eq!(&sent[0], recipient);
367 break;
368 }
369 }
370 }
371 Mode::Some => {
372 // Get all peers not including self
373 let mut recipients = addresses.clone();
374 recipients.remove(i);
375 recipients.sort();
376
377 // Loop until all peer sends successful
378 loop {
379 let mut sent = sender
380 .send(
381 Recipients::Some(recipients.clone()),
382 msg.to_vec().into(),
383 true,
384 )
385 .await
386 .unwrap();
387 if sent.len() != n - 1 {
388 context.sleep(Duration::from_millis(100)).await;
389 continue;
390 }
391
392 // Compare to expected
393 sent.sort();
394 assert_eq!(sent, recipients);
395 break;
396 }
397 }
398 Mode::All => {
399 // Get all peers not including self
400 let mut recipients = addresses.clone();
401 recipients.remove(i);
402 recipients.sort();
403
404 // Loop until all peer sends successful
405 loop {
406 let mut sent = sender
407 .send(Recipients::All, msg.to_vec().into(), true)
408 .await
409 .unwrap();
410 if sent.len() != n - 1 {
411 context.sleep(Duration::from_millis(100)).await;
412 continue;
413 }
414
415 // Compare to expected
416 sent.sort();
417 assert_eq!(sent, recipients);
418 break;
419 }
420 }
421 };
422
423 // Sleep to avoid busy loop
424 context.sleep(Duration::from_secs(10)).await;
425 }
426 });
427 }
428 });
429 }
430
431 // Wait for all peers to finish
432 for _ in 0..n {
433 complete_receiver.next().await.unwrap();
434 }
435
436 // Ensure no message rate limiting occurred
437 assert_no_rate_limiting(&context);
438 }
439
440 fn run_deterministic_test(seed: u64, mode: Mode) {
441 // Configure test
442 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
443 const NUM_PEERS: usize = 25;
444 const BASE_PORT: u16 = 3000;
445
446 // Run first instance
447 let executor = deterministic::Runner::seeded(seed);
448 let state = executor.start(|context| async move {
449 run_network(
450 context.clone(),
451 MAX_MESSAGE_SIZE,
452 BASE_PORT,
453 NUM_PEERS,
454 mode,
455 )
456 .await;
457 context.auditor().state()
458 });
459
460 // Compare result to second instance
461 let executor = deterministic::Runner::seeded(seed);
462 let state2 = executor.start(|context| async move {
463 run_network(
464 context.clone(),
465 MAX_MESSAGE_SIZE,
466 BASE_PORT,
467 NUM_PEERS,
468 mode,
469 )
470 .await;
471 context.auditor().state()
472 });
473 assert_eq!(state, state2);
474 }
475
476 #[test_traced]
477 #[ignore]
478 fn test_determinism_one() {
479 for i in 0..10 {
480 run_deterministic_test(i, Mode::One);
481 }
482 }
483
484 #[test_traced]
485 #[ignore]
486 fn test_determinism_some() {
487 for i in 0..10 {
488 run_deterministic_test(i, Mode::Some);
489 }
490 }
491
492 #[test_traced]
493 #[ignore]
494 fn test_determinism_all() {
495 for i in 0..10 {
496 run_deterministic_test(i, Mode::All);
497 }
498 }
499
500 #[test_traced]
501 fn test_tokio_connectivity() {
502 let cfg = tokio::Config::default();
503 let executor = tokio::Runner::new(cfg.clone());
504 executor.start(|context| async move {
505 const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
506 let base_port = 3000;
507 let n = 10;
508 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
509 });
510 }
511
512 #[test_traced]
513 fn test_multi_index_oracle() {
514 // Configure test
515 let base_port = 3000;
516 let n: usize = 100;
517
518 // Initialize context
519 let executor = deterministic::Runner::default();
520 executor.start(|context| async move {
521 // Create peers
522 let mut peers = Vec::new();
523 for i in 0..n {
524 peers.push(ed25519::PrivateKey::from_seed(i as u64));
525 }
526 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
527
528 // Create networks
529 let mut waiters = Vec::new();
530 for (i, peer) in peers.iter().enumerate() {
531 // Create peer context
532 let context = context.with_label(&format!("peer-{i}"));
533
534 // Derive port
535 let port = base_port + i as u16;
536
537 // Create bootstrappers
538 let mut bootstrappers = Vec::new();
539 if i > 0 {
540 bootstrappers.push((
541 addresses[0].clone(),
542 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
543 ));
544 }
545
546 // Create network
547 let signer = peer.clone();
548 let config = Config::test(
549 signer.clone(),
550 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
551 bootstrappers,
552 1_024 * 1_024, // 1MB
553 );
554 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
555
556 // Register peers at separate indices
557 oracle.register(0, vec![addresses[0].clone()]).await;
558 oracle
559 .register(1, vec![addresses[1].clone(), addresses[2].clone()])
560 .await;
561 oracle
562 .register(2, addresses.iter().skip(2).cloned().collect())
563 .await;
564
565 // Register basic application
566 let (mut sender, mut receiver) =
567 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
568
569 // Wait to connect to all peers, and then send messages to everyone
570 network.start();
571
572 // Send/Receive messages
573 let handler = context
574 .with_label("agent")
575 .spawn(move |context| async move {
576 if i == 0 {
577 // Loop until success
578 let msg = signer.public_key();
579 loop {
580 if sender
581 .send(Recipients::All, msg.to_vec().into(), true)
582 .await
583 .unwrap()
584 .len()
585 == n - 1
586 {
587 break;
588 }
589
590 // Sleep and try again (avoid busy loop)
591 context.sleep(Duration::from_millis(100)).await;
592 }
593 } else {
594 // Ensure message equals sender identity
595 let (sender, message) = receiver.recv().await.unwrap();
596 assert_eq!(sender.as_ref(), message.as_ref());
597 }
598 });
599
600 // Add to waiters
601 waiters.push(handler);
602 }
603
604 // Wait for waiters to finish (receiver before sender)
605 for waiter in waiters.into_iter().rev() {
606 waiter.await.unwrap();
607 }
608
609 // Ensure no message rate limiting occurred
610 assert_no_rate_limiting(&context);
611 });
612 }
613
614 #[test_traced]
615 fn test_message_too_large() {
616 // Configure test
617 let base_port = 3000;
618 let n: usize = 2;
619
620 // Initialize context
621 let executor = deterministic::Runner::seeded(0);
622 executor.start(|mut context| async move {
623 // Create peers
624 let mut peers = Vec::new();
625 for i in 0..n {
626 peers.push(ed25519::PrivateKey::from_seed(i as u64));
627 }
628 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
629
630 // Create network
631 let signer = peers[0].clone();
632 let config = Config::test(
633 signer.clone(),
634 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
635 Vec::new(),
636 1_024 * 1_024, // 1MB
637 );
638 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
639
640 // Register peers
641 oracle.register(0, addresses.clone()).await;
642
643 // Register basic application
644 let (mut sender, _) =
645 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
646
647 // Wait to connect to all peers, and then send messages to everyone
648 network.start();
649
650 // Crate random message
651 let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
652 context.fill(&mut msg[..]);
653
654 // Send message
655 let recipient = Recipients::One(addresses[1].clone());
656 let result = sender.send(recipient, msg.into(), true).await;
657 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
658 });
659 }
660
661 #[test_traced]
662 #[should_panic(expected = "no messages should be rate limited")]
663 fn test_rate_limiting() {
664 // Configure test
665 let base_port = 3000;
666 let n: usize = 2;
667
668 // Initialize context
669 let executor = deterministic::Runner::seeded(0);
670 executor.start(|context| async move {
671 // Create peers
672 let mut peers = Vec::new();
673 for i in 0..n {
674 peers.push(ed25519::PrivateKey::from_seed(i as u64));
675 }
676 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
677 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
678 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
679
680 // Create network for peer 0
681 let signer0 = peers[0].clone();
682 let config0 = Config::test(
683 signer0.clone(),
684 socket0,
685 vec![(peers[1].public_key(), socket1)],
686 1_024 * 1_024, // 1MB
687 );
688 let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
689 oracle0.register(0, addresses.clone()).await;
690 let (mut sender0, _receiver0) =
691 network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
692 network0.start();
693
694 // Create network for peer 1
695 let signer1 = peers[1].clone();
696 let config1 = Config::test(
697 signer1.clone(),
698 socket1,
699 vec![(peers[0].public_key(), socket0)],
700 1_024 * 1_024, // 1MB
701 );
702 let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
703 oracle1.register(0, addresses.clone()).await;
704 let (_sender1, _receiver1) =
705 network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
706 network1.start();
707
708 // Send first message, which should be allowed and consume the quota.
709 let msg = vec![0u8; 1024]; // 1KB
710 loop {
711 // Confirm message is sent to peer
712 let sent = sender0
713 .send(
714 Recipients::One(addresses[1].clone()),
715 msg.clone().into(),
716 true,
717 )
718 .await
719 .unwrap();
720 if !sent.is_empty() {
721 break;
722 }
723
724 // Sleep and try again (avoid busy loop)
725 context.sleep(Duration::from_millis(100)).await;
726 }
727
728 // Immediately send the second message to trigger the rate limit.
729 let sent = sender0
730 .send(Recipients::One(addresses[1].clone()), msg.into(), true)
731 .await
732 .unwrap();
733 assert!(!sent.is_empty());
734
735 // Loop until the metrics reflect the rate-limited message.
736 for _ in 0..10 {
737 assert_no_rate_limiting(&context);
738 context.sleep(Duration::from_millis(100)).await;
739 }
740 });
741 }
742}