commonware_p2p/authenticated/lookup/
mod.rs

1//! Communicate with a fixed set of authenticated peers with known addresses over encrypted connections.
2//!
3//! `lookup` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Unlike `discovery`, peers in `lookup` don't use a discovery mechanism to find each other;
6//! each peer's address is supplied by the application layer.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
12//!
13//! # Design
14//!
15//! ## Discovery
16//!
17//! This module operates under the assumption that all peers are aware of and synchronized on
18//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
19//! list of peer `PublicKey`/`SocketAddr` pairs (`(u64, Vec<(PublicKey, SocketAddr)>)`).
20//!
21//! On startup, the application supplies the initial set of peers. The `Oracle` actor allows
22//! the application to update peer --> address mappings so that peers can find each other.
23//!
24//! Any inbound connection attempts from an IP address that is not in the union of all registered
25//! peer sets will be rejected.
26//!
27//! ## Messages
28//!
29//! Application-level data is exchanged using the `Data` message type. This structure contains:
30//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
31//! - `message`: The arbitrary application payload as `Bytes`.
32//!
33//! The size of the `message` bytes must not exceed the configured
34//! `max_message_size`. If it does, the sending operation will fail with
35//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
36//! communications to potentially bypass lower-priority messages waiting in send queues across all
37//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
38//! and rate limiting.
39//!
40//! ## Compression
41//!
42//! Stream compression is not provided at the transport layer to avoid inadvertently
43//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
44//! between compression and encryption by analyzing patterns in the resulting data.
45//! By compressing secrets alongside attacker-controlled content, these attacks can infer
46//! sensitive information through compression ratio analysis. Applications that choose
47//! to compress data should do so with full awareness of these risks and implement
48//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
49//! alongside sensitive information).
50//!
51//! ## Rate Limiting
52//!
53//! There are five primary rate limits:
54//!
55//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
56//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
57//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
58//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
59//! - `rate` (per channel): The rate limit for messages sent on a single channel.
60//!
61//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
62//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
63//!
64//! # Example
65//!
66//! ```rust
67//! use commonware_p2p::{authenticated::lookup::{self, Network}, Manager, Sender, Recipients};
68//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, PrivateKeyExt as _};
69//! use commonware_runtime::{deterministic, Spawner, Runner, Metrics};
70//! use commonware_utils::{NZU32, set::OrderedAssociated};
71//! use governor::Quota;
72//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
73//!
74//! // Configure context
75//! let runtime_cfg = deterministic::Config::default();
76//! let runner = deterministic::Runner::new(runtime_cfg.clone());
77//!
78//! // Generate identity
79//! //
80//! // In production, the signer should be generated from a secure source of entropy.
81//! let my_sk = ed25519::PrivateKey::from_seed(0);
82//! let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000);
83//!
84//! // Generate peers
85//! //
86//! // In production, peer identities will be provided by some external source of truth
87//! // (like the staking set of a blockchain).
88//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
89//! let peer1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001);
90//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
91//! let peer2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002);
92//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
93//! let peer3_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3003);
94//!
95//! // Configure namespace
96//! //
97//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
98//! let application_namespace = b"my-app-namespace";
99//!
100//! // Configure network
101//! //
102//! // In production, use a more conservative configuration like `Config::recommended`.
103//! const MAX_MESSAGE_SIZE: usize = 1_024; // 1KB
104//! let p2p_cfg = lookup::Config::local(
105//!     my_sk.clone(),
106//!     application_namespace,
107//!     my_addr,
108//!     my_addr,
109//!     MAX_MESSAGE_SIZE,
110//! );
111//!
112//! // Start context
113//! runner.start(|context| async move {
114//!     // Initialize network
115//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
116//!
117//!     // Register authorized peers
118//!     //
119//!     // In production, this would be updated as new peer sets are created (like when
120//!     // the composition of a validator set changes).
121//!     oracle.update(
122//!         0,
123//!         OrderedAssociated::from([(my_sk.public_key(), my_addr), (peer1, peer1_addr), (peer2, peer2_addr), (peer3, peer3_addr)])
124//!     ).await;
125//!
126//!     // Register some channel
127//!     const MAX_MESSAGE_BACKLOG: usize = 128;
128//!     let (mut sender, receiver) = network.register(
129//!         0,
130//!         Quota::per_second(NZU32!(1)),
131//!         MAX_MESSAGE_BACKLOG,
132//!     );
133//!
134//!     // Run network
135//!     let network_handler = network.start();
136//!
137//!     // Example: Use sender
138//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
139//!
140//!     // Shutdown network
141//!     network_handler.abort();
142//! });
143//! ```
144
145mod actors;
146mod channels;
147mod config;
148mod metrics;
149mod network;
150mod types;
151
152use thiserror::Error;
153
154/// Errors that can occur when interacting with the network.
155#[derive(Error, Debug)]
156pub enum Error {
157    #[error("message too large: {0}")]
158    MessageTooLarge(usize),
159    #[error("network closed")]
160    NetworkClosed,
161}
162
163pub use actors::tracker::Oracle;
164pub use channels::{Receiver, Sender};
165pub use config::Config;
166pub use network::Network;
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use crate::{Manager, Receiver, Recipients, Sender};
172    use commonware_cryptography::{ed25519, PrivateKeyExt as _, Signer as _};
173    use commonware_macros::{select, test_traced};
174    use commonware_runtime::{
175        deterministic, tokio, Clock, Metrics, Network as RNetwork, Runner, Spawner,
176    };
177    use commonware_utils::{set::OrderedAssociated, NZU32};
178    use futures::{channel::mpsc, SinkExt, StreamExt};
179    use governor::{clock::ReasonablyRealtime, Quota};
180    use rand::{CryptoRng, Rng};
181    use std::{
182        collections::HashSet,
183        net::{IpAddr, Ipv4Addr, SocketAddr},
184        time::Duration,
185    };
186
187    #[derive(Copy, Clone)]
188    enum Mode {
189        All,
190        Some,
191        One,
192    }
193
194    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
195
196    /// Ensure no message rate limiting occurred.
197    ///
198    /// If a message is rate limited, it would be formatted as:
199    ///
200    /// ```text
201    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
202    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
203    /// ```
204    fn assert_no_rate_limiting(context: &impl Metrics) {
205        let metrics = context.encode();
206        assert!(
207            !metrics.contains("messages_rate_limited_total{"),
208            "no messages should be rate limited: {metrics}"
209        );
210    }
211
212    /// Test connectivity between `n` peers.
213    ///
214    /// We set a unique `base_port` for each test to avoid "address already in use"
215    /// errors when tests are run immediately after each other.
216    async fn run_network(
217        context: impl Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
218        max_message_size: usize,
219        base_port: u16,
220        n: usize,
221        mode: Mode,
222    ) {
223        // Create peers
224        let mut peers_and_sks = Vec::new();
225        for i in 0..n {
226            let private_key = ed25519::PrivateKey::from_seed(i as u64);
227            let public_key = private_key.public_key();
228            let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
229            peers_and_sks.push((private_key, public_key, address));
230        }
231        let peers = peers_and_sks
232            .iter()
233            .map(|(_, pub_key, addr)| (pub_key.clone(), *addr))
234            .collect::<Vec<_>>();
235
236        // Create networks
237        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
238        for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
239            let public_key = public_key.clone();
240
241            // Create peer context
242            let context = context.with_label(&format!("peer-{i}"));
243
244            // Create network
245            let config = Config::test(private_key.clone(), *address, max_message_size);
246            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
247
248            // Register peers
249            oracle
250                .update(0, OrderedAssociated::from(peers.clone()))
251                .await;
252
253            // Register basic application
254            let (mut sender, mut receiver) =
255                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
256
257            // Wait to connect to all peers, and then send messages to everyone
258            network.start();
259
260            // Send/Receive messages
261            let mut public_keys = peers
262                .iter()
263                .filter_map(|(pk, _)| {
264                    if pk != &public_key {
265                        Some(pk.clone())
266                    } else {
267                        None
268                    }
269                })
270                .collect::<Vec<_>>();
271            public_keys.sort();
272            context.with_label("agent").spawn({
273                let mut complete_sender = complete_sender.clone();
274                let peers = peers.clone();
275                move |context| async move {
276                    // Wait for all peers to send their identity
277                    let receiver = context.with_label("receiver").spawn(move |_| async move {
278                        // Wait for all peers to send their identity
279                        let mut received = HashSet::new();
280                        while received.len() < n - 1 {
281                            // Ensure message equals sender identity
282                            let (sender, message) = receiver.recv().await.unwrap();
283                            assert_eq!(sender.as_ref(), message.as_ref());
284
285                            // Add to received set
286                            received.insert(sender);
287                        }
288                        complete_sender.send(()).await.unwrap();
289
290                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
291                        loop {
292                            receiver.recv().await.unwrap();
293                        }
294                    });
295
296                    // Send identity to all peers
297                    let sender = context
298                        .with_label("sender")
299                        .spawn(move |context| async move {
300                            // Loop forever to account for unexpected message drops
301                            loop {
302                                match mode {
303                                    Mode::One => {
304                                        for (j, (pub_key, _)) in peers.iter().enumerate() {
305                                            // Don't send message to self
306                                            if i == j {
307                                                continue;
308                                            }
309
310                                            // Loop until success
311                                            loop {
312                                                let sent = sender
313                                                    .send(
314                                                        Recipients::One(pub_key.clone()),
315                                                        public_key.to_vec().into(),
316                                                        true,
317                                                    )
318                                                    .await
319                                                    .unwrap();
320                                                if sent.len() != 1 {
321                                                    context.sleep(Duration::from_millis(100)).await;
322                                                    continue;
323                                                }
324                                                assert_eq!(&sent[0], pub_key);
325                                                break;
326                                            }
327                                        }
328                                    }
329                                    Mode::Some => {
330                                        // Get all peers not including self
331                                        let mut recipients = peers.clone();
332                                        recipients.remove(i);
333                                        recipients.sort();
334
335                                        // Loop until all peer sends successful
336                                        loop {
337                                            let mut sent = sender
338                                                .send(
339                                                    Recipients::Some(public_keys.clone()),
340                                                    public_key.to_vec().into(),
341                                                    true,
342                                                )
343                                                .await
344                                                .unwrap();
345                                            if sent.len() != n - 1 {
346                                                context.sleep(Duration::from_millis(100)).await;
347                                                continue;
348                                            }
349
350                                            // Compare to expected
351                                            sent.sort();
352                                            assert_eq!(sent, public_keys);
353                                            break;
354                                        }
355                                    }
356                                    Mode::All => {
357                                        // Get all peers not including self
358                                        let mut recipients = peers.clone();
359                                        recipients.remove(i);
360                                        recipients.sort();
361
362                                        // Loop until all peer sends successful
363                                        loop {
364                                            let mut sent = sender
365                                                .send(
366                                                    Recipients::All,
367                                                    public_key.to_vec().into(),
368                                                    true,
369                                                )
370                                                .await
371                                                .unwrap();
372                                            if sent.len() != n - 1 {
373                                                context.sleep(Duration::from_millis(100)).await;
374                                                continue;
375                                            }
376
377                                            // Compare to expected
378                                            sent.sort();
379                                            assert_eq!(sent, public_keys);
380                                            break;
381                                        }
382                                    }
383                                };
384
385                                // Sleep to avoid busy loop
386                                context.sleep(Duration::from_secs(10)).await;
387                            }
388                        });
389
390                    // Neither task should exit
391                    select! {
392                        receiver = receiver => {
393                            panic!("receiver exited: {receiver:?}");
394                        },
395                        sender = sender => {
396                            panic!("sender exited: {sender:?}");
397                        },
398                    }
399                }
400            });
401        }
402
403        // Wait for all peers to finish
404        for _ in 0..n {
405            complete_receiver.next().await.unwrap();
406        }
407
408        // Ensure no message rate limiting occurred
409        assert_no_rate_limiting(&context);
410    }
411
412    fn run_deterministic_test(seed: u64, mode: Mode) {
413        // Configure test
414        const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
415        const NUM_PEERS: usize = 25;
416        const BASE_PORT: u16 = 3000;
417
418        // Run first instance
419        let executor = deterministic::Runner::seeded(seed);
420        let state = executor.start(|context| async move {
421            run_network(
422                context.clone(),
423                MAX_MESSAGE_SIZE,
424                BASE_PORT,
425                NUM_PEERS,
426                mode,
427            )
428            .await;
429            context.auditor().state()
430        });
431
432        // Compare result to second instance
433        let executor = deterministic::Runner::seeded(seed);
434        let state2 = executor.start(|context| async move {
435            run_network(
436                context.clone(),
437                MAX_MESSAGE_SIZE,
438                BASE_PORT,
439                NUM_PEERS,
440                mode,
441            )
442            .await;
443            context.auditor().state()
444        });
445        assert_eq!(state, state2);
446    }
447
448    #[test_traced]
449    #[ignore]
450    fn test_determinism_one() {
451        for i in 0..10 {
452            run_deterministic_test(i, Mode::One);
453        }
454    }
455
456    #[test_traced]
457    #[ignore]
458    fn test_determinism_some() {
459        for i in 0..10 {
460            run_deterministic_test(i, Mode::Some);
461        }
462    }
463
464    #[test_traced]
465    #[ignore]
466    fn test_determinism_all() {
467        for i in 0..10 {
468            run_deterministic_test(i, Mode::All);
469        }
470    }
471
472    #[test_traced]
473    fn test_tokio_connectivity() {
474        let executor = tokio::Runner::default();
475        executor.start(|context| async move {
476            const MAX_MESSAGE_SIZE: usize = 1_024 * 1_024; // 1MB
477            let base_port = 4000;
478            let n = 10;
479            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
480        });
481    }
482
483    #[test_traced]
484    fn test_multi_index_oracle() {
485        // Configure test
486        let base_port = 3000;
487        let n: usize = 10;
488
489        // Initialize context
490        let executor = deterministic::Runner::default();
491        executor.start(|context| async move {
492            // Create peers
493            let mut peers_and_sks = Vec::new();
494            for i in 0..n {
495                let sk = ed25519::PrivateKey::from_seed(i as u64);
496                let pk = sk.public_key();
497                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
498                peers_and_sks.push((sk, pk, addr));
499            }
500            let peers = peers_and_sks
501                .iter()
502                .map(|(_, pk, addr)| (pk.clone(), *addr))
503                .collect::<Vec<_>>();
504
505            // Create networks
506            let mut waiters = Vec::new();
507            for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
508                // Create peer context
509                let context = context.with_label(&format!("peer-{i}"));
510
511                // Create network
512                let config = Config::test(
513                    peer_sk.clone(),
514                    *peer_addr,
515                    1_024 * 1_024, // 1MB
516                );
517                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
518
519                // Register peers at separate indices
520                oracle
521                    .update(0, OrderedAssociated::from([peers[0].clone()]))
522                    .await;
523                oracle
524                    .update(
525                        1,
526                        OrderedAssociated::from([peers[1].clone(), peers[2].clone()]),
527                    )
528                    .await;
529                oracle
530                    .update(2, peers.iter().skip(2).cloned().collect())
531                    .await;
532
533                // Register basic application
534                let (mut sender, mut receiver) =
535                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
536
537                // Wait to connect to all peers, and then send messages to everyone
538                network.start();
539
540                // Send/Receive messages
541                let msg = peer_pk.clone();
542                let handler = context
543                    .with_label("agent")
544                    .spawn(move |context| async move {
545                        if i == 0 {
546                            // Loop until success
547                            loop {
548                                if sender
549                                    .send(Recipients::All, msg.to_vec().into(), true)
550                                    .await
551                                    .unwrap()
552                                    .len()
553                                    == n - 1
554                                {
555                                    break;
556                                }
557
558                                // Sleep and try again (avoid busy loop)
559                                context.sleep(Duration::from_millis(100)).await;
560                            }
561                        } else {
562                            // Ensure message equals sender identity
563                            let (sender, message) = receiver.recv().await.unwrap();
564                            assert_eq!(sender.as_ref(), message.as_ref());
565                        }
566                    });
567
568                // Add to waiters
569                waiters.push(handler);
570            }
571
572            // Wait for waiters to finish (receiver before sender)
573            for waiter in waiters.into_iter().rev() {
574                waiter.await.unwrap();
575            }
576
577            // Ensure no message rate limiting occurred
578            assert_no_rate_limiting(&context);
579        });
580    }
581
582    #[test_traced]
583    fn test_message_too_large() {
584        // Configure test
585        let base_port = 3000;
586        let n: usize = 2;
587
588        // Initialize context
589        let executor = deterministic::Runner::seeded(0);
590        executor.start(|mut context| async move {
591            // Create peers
592            let mut peers_and_sks = Vec::new();
593            for i in 0..n {
594                let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
595                let peer_pk = peer_sk.public_key();
596                let peer_addr =
597                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
598                peers_and_sks.push((peer_sk, peer_pk, peer_addr));
599            }
600            let peers: OrderedAssociated<_, _> = peers_and_sks
601                .iter()
602                .map(|(_, pk, addr)| (pk.clone(), *addr))
603                .collect();
604
605            // Create network
606            let (sk, _, addr) = peers_and_sks[0].clone();
607            let config = Config::test(
608                sk,
609                addr,
610                1_024 * 1_024, // 1MB
611            );
612            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
613
614            // Register peers
615            oracle.update(0, peers.clone()).await;
616
617            // Register basic application
618            let (mut sender, _) =
619                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
620
621            // Wait to connect to all peers, and then send messages to everyone
622            network.start();
623
624            // Crate random message
625            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
626            context.fill(&mut msg[..]);
627
628            // Send message
629            let recipient = Recipients::One(peers[1].clone());
630            let result = sender.send(recipient, msg.into(), true).await;
631            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
632        });
633    }
634
635    #[test_traced]
636    #[should_panic(expected = "no messages should be rate limited")]
637    fn test_rate_limiting() {
638        // Configure test
639        let base_port = 3000;
640        let n: usize = 2;
641
642        // Initialize context
643        let executor = deterministic::Runner::seeded(0);
644        executor.start(|context| async move {
645            // Create peers
646            let mut peers_and_sks = Vec::new();
647            for i in 0..n {
648                let sk = ed25519::PrivateKey::from_seed(i as u64);
649                let pk = sk.public_key();
650                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
651                peers_and_sks.push((sk, pk, addr));
652            }
653            let peers: OrderedAssociated<_, _> = peers_and_sks
654                .iter()
655                .map(|(_, pk, addr)| (pk.clone(), *addr))
656                .collect();
657            let (sk0, _, addr0) = peers_and_sks[0].clone();
658            let (sk1, pk1, addr1) = peers_and_sks[1].clone();
659
660            // Create network for peer 0
661            let config0 = Config::test(sk0, addr0, 1_024 * 1_024); // 1MB
662            let (mut network0, mut oracle0) = Network::new(context.with_label("peer-0"), config0);
663            oracle0.update(0, peers.clone()).await;
664            let (mut sender0, _receiver0) =
665                network0.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
666            network0.start();
667
668            // Create network for peer 1
669            let config1 = Config::test(sk1, addr1, 1_024 * 1_024); // 1MB
670            let (mut network1, mut oracle1) = Network::new(context.with_label("peer-1"), config1);
671            oracle1.update(0, peers.clone()).await;
672            let (_sender1, _receiver1) =
673                network1.register(0, Quota::per_hour(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
674            network1.start();
675
676            // Send first message, which should be allowed and consume the quota.
677            let msg = vec![0u8; 1024]; // 1KB
678            loop {
679                // Confirm message is sent to peer
680                let sent = sender0
681                    .send(Recipients::One(pk1.clone()), msg.clone().into(), true)
682                    .await
683                    .unwrap();
684                if !sent.is_empty() {
685                    break;
686                }
687
688                // Sleep and try again (avoid busy loop)
689                context.sleep(Duration::from_millis(100)).await;
690            }
691
692            // Immediately send the second message to trigger the rate limit.
693            let sent = sender0
694                .send(Recipients::One(pk1), msg.into(), true)
695                .await
696                .unwrap();
697            assert!(!sent.is_empty());
698
699            // Loop until the metrics reflect the rate-limited message.
700            for _ in 0..10 {
701                assert_no_rate_limiting(&context);
702                context.sleep(Duration::from_millis(100)).await;
703            }
704        });
705    }
706}