commonware_p2p/authenticated/lookup/
mod.rs

1//! Communicate with a fixed set of authenticated peers with known addresses over encrypted connections.
2//!
3//! `lookup` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Unlike `discovery`, peers in `lookup` don't use a discovery mechanism to find each other;
6//! each peer's address is supplied by the application layer.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
12//!
13//! # Design
14//!
15//! ## Discovery
16//!
17//! This module operates under the assumption that all peers are aware of and synchronized on
18//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
19//! list of peer `PublicKey`/`SocketAddr` pairs (`(u64, Vec<(PublicKey, SocketAddr)>)`).
20//!
21//! On startup, the application supplies the initial set of peers. The `Oracle` actor allows
22//! the application to update peer --> address mappings so that peers can find each other.
23//!
24//! Any inbound connection attempts from an IP address that is not in the union of all registered
25//! peer sets will be rejected.
26//!
27//! ## Messages
28//!
29//! Application-level data is exchanged using the `Data` message type. This structure contains:
30//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
31//! - `message`: The arbitrary application payload as `Bytes`.
32//!
33//! The size of the `message` bytes must not exceed the configured
34//! `max_message_size`. If it does, the sending operation will fail with
35//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
36//! communications to potentially bypass lower-priority messages waiting in send queues across all
37//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
38//! and rate limiting.
39//!
40//! ## Compression
41//!
42//! Stream compression is not provided at the transport layer to avoid inadvertently
43//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
44//! between compression and encryption by analyzing patterns in the resulting data.
45//! By compressing secrets alongside attacker-controlled content, these attacks can infer
46//! sensitive information through compression ratio analysis. Applications that choose
47//! to compress data should do so with full awareness of these risks and implement
48//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
49//! alongside sensitive information).
50//!
51//! ## Rate Limiting
52//!
53//! There are five primary rate limits:
54//!
55//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
56//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
57//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
58//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
59//! - `rate` (per channel): The rate limit for messages sent on a single channel.
60//!
61//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
62//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
63//!
64//! ## Message Delivery
65//!
66//! Outgoing messages are dropped when a peer's send buffer is full, preventing slow peers
67//! from blocking sends to other peers. Incoming messages are dropped when the application's
68//! receive buffer is full, ensuring ping messages continue to flow and connections remain
69//! healthy.
70//!
71//! # Example
72//!
73//! ```rust
74//! use commonware_p2p::{authenticated::lookup::{self, Network}, Address, Manager, Sender, Recipients};
75//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, };
76//! use commonware_runtime::{deterministic, Metrics, Quota, Runner, Spawner};
77//! use commonware_utils::{NZU32, ordered::Map};
78//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
79//!
80//! // Configure context
81//! let runtime_cfg = deterministic::Config::default();
82//! let runner = deterministic::Runner::new(runtime_cfg);
83//!
84//! // Generate identity
85//! //
86//! // In production, the signer should be generated from a secure source of entropy.
87//! let my_sk = ed25519::PrivateKey::from_seed(0);
88//! let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
89//!
90//! // Generate peers
91//! //
92//! // In production, peer identities will be provided by some external source of truth
93//! // (like the staking set of a blockchain).
94//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
95//! let peer1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001);
96//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
97//! let peer2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002);
98//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
99//! let peer3_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3003);
100//!
101//! // Configure namespace
102//! //
103//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
104//! let application_namespace = b"my-app-namespace";
105//!
106//! // Configure network
107//! //
108//! // In production, use a more conservative configuration like `Config::recommended`.
109//! const MAX_MESSAGE_SIZE: u32 = 1_024; // 1KB
110//! let p2p_cfg = lookup::Config::local(
111//!     my_sk.clone(),
112//!     application_namespace,
113//!     my_addr,
114//!     MAX_MESSAGE_SIZE,
115//! );
116//!
117//! // Start context
118//! runner.start(|context| async move {
119//!     // Initialize network
120//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
121//!
122//!     // Register authorized peers
123//!     //
124//!     // In production, this would be updated as new peer sets are created (like when
125//!     // the composition of a validator set changes).
126//!     let peers: Map<_, Address> = [
127//!         (my_sk.public_key(), my_addr.into()),
128//!         (peer1, peer1_addr.into()),
129//!         (peer2, peer2_addr.into()),
130//!         (peer3, peer3_addr.into()),
131//!     ].try_into().unwrap();
132//!     oracle.update(0, peers).await;
133//!
134//!     // Register some channel
135//!     const MAX_MESSAGE_BACKLOG: usize = 128;
136//!     let (mut sender, receiver) = network.register(
137//!         0,
138//!         Quota::per_second(NZU32!(1)),
139//!         MAX_MESSAGE_BACKLOG,
140//!     );
141//!
142//!     // Run network
143//!     network.start();
144//!
145//!     // Example: Use sender
146//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
147//!
148//!     // Graceful shutdown (stops all spawned tasks)
149//!     context.stop(0, None).await.unwrap();
150//! });
151//! ```
152
153mod actors;
154mod channels;
155mod config;
156mod metrics;
157mod network;
158mod types;
159
160use thiserror::Error;
161
162/// Errors that can occur when interacting with the network.
163#[derive(Error, Debug)]
164pub enum Error {
165    #[error("message too large: {0}")]
166    MessageTooLarge(usize),
167    #[error("network closed")]
168    NetworkClosed,
169}
170
171pub use actors::tracker::Oracle;
172pub use channels::{Receiver, Sender};
173pub use config::Config;
174pub use network::Network;
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::{Address, Blocker, Ingress, Manager, Receiver, Recipients, Sender};
180    use commonware_cryptography::{ed25519, Signer as _};
181    use commonware_macros::{select, test_group, test_traced};
182    use commonware_runtime::{
183        count_running_tasks, deterministic, tokio, Clock, Metrics, Network as RNetwork, Quota,
184        Resolver, Runner, Spawner,
185    };
186    use commonware_utils::{
187        hostname,
188        ordered::{Map, Set},
189        Hostname, TryCollect, NZU32,
190    };
191    use futures::{channel::mpsc, SinkExt, StreamExt};
192    use rand_core::{CryptoRngCore, RngCore};
193    use std::{
194        collections::HashSet,
195        net::{IpAddr, Ipv4Addr, SocketAddr},
196        time::Duration,
197    };
198
199    #[derive(Copy, Clone)]
200    enum Mode {
201        All,
202        Some,
203        One,
204    }
205
206    const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; // 1MB
207    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
208
209    /// Ensure no message rate limiting occurred.
210    ///
211    /// If a message is rate limited, it would be formatted as:
212    ///
213    /// ```text
214    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
215    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
216    /// ```
217    fn assert_no_rate_limiting(context: &impl Metrics) {
218        let metrics = context.encode();
219        assert!(
220            !metrics.contains("messages_rate_limited_total{"),
221            "no messages should be rate limited: {metrics}"
222        );
223    }
224
225    /// Test connectivity between `n` peers.
226    ///
227    /// We set a unique `base_port` for each test to avoid "address already in use"
228    /// errors when tests are run immediately after each other.
229    async fn run_network(
230        context: impl Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
231        max_message_size: u32,
232        base_port: u16,
233        n: usize,
234        mode: Mode,
235    ) {
236        // Create peers
237        let mut peers_and_sks = Vec::new();
238        for i in 0..n {
239            let private_key = ed25519::PrivateKey::from_seed(i as u64);
240            let public_key = private_key.public_key();
241            let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
242            peers_and_sks.push((private_key, public_key, address));
243        }
244        let peers = peers_and_sks
245            .iter()
246            .map(|(_, pub_key, addr)| (pub_key.clone(), (*addr).into()))
247            .collect::<Vec<_>>();
248
249        // Create networks
250        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
251        for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
252            let public_key = public_key.clone();
253
254            // Create peer context
255            let context = context.with_label(&format!("peer_{i}"));
256
257            // Create network
258            let config = Config::test(private_key.clone(), *address, max_message_size);
259            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
260
261            // Register peers
262            oracle.update(0, peers.clone().try_into().unwrap()).await;
263
264            // Register basic application
265            let (mut sender, mut receiver) =
266                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
267
268            // Wait to connect to all peers, and then send messages to everyone
269            network.start();
270
271            // Send/Receive messages
272            let mut public_keys = peers
273                .iter()
274                .filter_map(|(pk, _)| {
275                    if pk != &public_key {
276                        Some(pk.clone())
277                    } else {
278                        None
279                    }
280                })
281                .collect::<Vec<_>>();
282            public_keys.sort();
283            context.with_label("agent").spawn({
284                let mut complete_sender = complete_sender.clone();
285                let peers = peers.clone();
286                move |context| async move {
287                    // Wait for all peers to send their identity
288                    let receiver = context.with_label("receiver").spawn(move |_| async move {
289                        // Wait for all peers to send their identity
290                        let mut received = HashSet::new();
291                        while received.len() < n - 1 {
292                            // Ensure message equals sender identity
293                            let (sender, message) = receiver.recv().await.unwrap();
294                            assert_eq!(sender.as_ref(), message.as_ref());
295
296                            // Add to received set
297                            received.insert(sender);
298                        }
299                        complete_sender.send(()).await.unwrap();
300
301                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
302                        loop {
303                            receiver.recv().await.unwrap();
304                        }
305                    });
306
307                    // Send identity to all peers
308                    let sender = context
309                        .with_label("sender")
310                        .spawn(move |context| async move {
311                            // Loop forever to account for unexpected message drops
312                            loop {
313                                match mode {
314                                    Mode::One => {
315                                        for (j, (pub_key, _)) in peers.iter().enumerate() {
316                                            // Don't send message to self
317                                            if i == j {
318                                                continue;
319                                            }
320
321                                            // Loop until success
322                                            loop {
323                                                let sent = sender
324                                                    .send(
325                                                        Recipients::One(pub_key.clone()),
326                                                        public_key.as_ref(),
327                                                        true,
328                                                    )
329                                                    .await
330                                                    .unwrap();
331                                                if sent.len() != 1 {
332                                                    context.sleep(Duration::from_millis(100)).await;
333                                                    continue;
334                                                }
335                                                assert_eq!(&sent[0], pub_key);
336                                                break;
337                                            }
338                                        }
339                                    }
340                                    Mode::Some => {
341                                        // Get all peers not including self
342                                        let mut recipients = peers.clone();
343                                        recipients.remove(i);
344                                        recipients.sort();
345
346                                        // Loop until all peer sends successful
347                                        loop {
348                                            let mut sent = sender
349                                                .send(
350                                                    Recipients::Some(public_keys.clone()),
351                                                    public_key.as_ref(),
352                                                    true,
353                                                )
354                                                .await
355                                                .unwrap();
356                                            if sent.len() != n - 1 {
357                                                context.sleep(Duration::from_millis(100)).await;
358                                                continue;
359                                            }
360
361                                            // Compare to expected
362                                            sent.sort();
363                                            assert_eq!(sent, public_keys);
364                                            break;
365                                        }
366                                    }
367                                    Mode::All => {
368                                        // Get all peers not including self
369                                        let mut recipients = peers.clone();
370                                        recipients.remove(i);
371                                        recipients.sort();
372
373                                        // Loop until all peer sends successful
374                                        loop {
375                                            let mut sent = sender
376                                                .send(Recipients::All, public_key.as_ref(), true)
377                                                .await
378                                                .unwrap();
379                                            if sent.len() != n - 1 {
380                                                context.sleep(Duration::from_millis(100)).await;
381                                                continue;
382                                            }
383
384                                            // Compare to expected
385                                            sent.sort();
386                                            assert_eq!(sent, public_keys);
387                                            break;
388                                        }
389                                    }
390                                };
391
392                                // Sleep to avoid busy loop
393                                context.sleep(Duration::from_secs(10)).await;
394                            }
395                        });
396
397                    // Neither task should exit
398                    select! {
399                        receiver = receiver => {
400                            panic!("receiver exited: {receiver:?}");
401                        },
402                        sender = sender => {
403                            panic!("sender exited: {sender:?}");
404                        },
405                    }
406                }
407            });
408        }
409
410        // Wait for all peers to finish
411        for _ in 0..n {
412            complete_receiver.next().await.unwrap();
413        }
414
415        // Ensure no message rate limiting occurred
416        assert_no_rate_limiting(&context);
417    }
418
419    fn run_deterministic_test(seed: u64, mode: Mode) {
420        // Configure test
421        const NUM_PEERS: usize = 25;
422        const BASE_PORT: u16 = 3000;
423
424        // Run first instance
425        let executor = deterministic::Runner::seeded(seed);
426        let state = executor.start(|context| async move {
427            run_network(
428                context.clone(),
429                MAX_MESSAGE_SIZE,
430                BASE_PORT,
431                NUM_PEERS,
432                mode,
433            )
434            .await;
435            context.auditor().state()
436        });
437
438        // Compare result to second instance
439        let executor = deterministic::Runner::seeded(seed);
440        let state2 = executor.start(|context| async move {
441            run_network(
442                context.clone(),
443                MAX_MESSAGE_SIZE,
444                BASE_PORT,
445                NUM_PEERS,
446                mode,
447            )
448            .await;
449            context.auditor().state()
450        });
451        assert_eq!(state, state2);
452    }
453
454    #[test_group("slow")]
455    #[test_traced]
456    fn test_determinism_one() {
457        for i in 0..10 {
458            run_deterministic_test(i, Mode::One);
459        }
460    }
461
462    #[test_group("slow")]
463    #[test_traced]
464    fn test_determinism_some() {
465        for i in 0..10 {
466            run_deterministic_test(i, Mode::Some);
467        }
468    }
469
470    #[test_group("slow")]
471    #[test_traced]
472    fn test_determinism_all() {
473        for i in 0..10 {
474            run_deterministic_test(i, Mode::All);
475        }
476    }
477
478    #[test_traced]
479    fn test_tokio_connectivity() {
480        let executor = tokio::Runner::default();
481        executor.start(|context| async move {
482            let base_port = 4000;
483            let n = 10;
484            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
485        });
486    }
487
488    #[test_traced]
489    fn test_multi_index_oracle() {
490        // Configure test
491        let base_port = 3000;
492        let n: usize = 10;
493
494        // Initialize context
495        let executor = deterministic::Runner::default();
496        executor.start(|context| async move {
497            // Create peers
498            let mut peers_and_sks = Vec::new();
499            for i in 0..n {
500                let sk = ed25519::PrivateKey::from_seed(i as u64);
501                let pk = sk.public_key();
502                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
503                peers_and_sks.push((sk, pk, addr));
504            }
505            let peers = peers_and_sks
506                .iter()
507                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
508                .collect::<Vec<_>>();
509
510            // Create networks
511            let mut waiters = Vec::new();
512            for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
513                // Create peer context
514                let context = context.with_label(&format!("peer_{i}"));
515
516                // Create network
517                let config = Config::test(
518                    peer_sk.clone(),
519                    *peer_addr,
520                    1_024 * 1_024, // 1MB
521                );
522                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
523
524                // Register peers at separate indices
525                oracle
526                    .update(0, [peers[0].clone()].try_into().unwrap())
527                    .await;
528                oracle
529                    .update(1, [peers[1].clone(), peers[2].clone()].try_into().unwrap())
530                    .await;
531                oracle
532                    .update(2, peers.iter().skip(2).cloned().try_collect().unwrap())
533                    .await;
534
535                // Register basic application
536                let (mut sender, mut receiver) =
537                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
538
539                // Wait to connect to all peers, and then send messages to everyone
540                network.start();
541
542                // Send/Receive messages
543                let msg = peer_pk.clone();
544                let handler = context
545                    .with_label("agent")
546                    .spawn(move |context| async move {
547                        if i == 0 {
548                            // Loop until success
549                            loop {
550                                if sender
551                                    .send(Recipients::All, msg.as_ref(), true)
552                                    .await
553                                    .unwrap()
554                                    .len()
555                                    == n - 1
556                                {
557                                    break;
558                                }
559
560                                // Sleep and try again (avoid busy loop)
561                                context.sleep(Duration::from_millis(100)).await;
562                            }
563                        } else {
564                            // Ensure message equals sender identity
565                            let (sender, message) = receiver.recv().await.unwrap();
566                            assert_eq!(sender.as_ref(), message.as_ref());
567                        }
568                    });
569
570                // Add to waiters
571                waiters.push(handler);
572            }
573
574            // Wait for waiters to finish (receiver before sender)
575            for waiter in waiters.into_iter().rev() {
576                waiter.await.unwrap();
577            }
578
579            // Ensure no message rate limiting occurred
580            assert_no_rate_limiting(&context);
581        });
582    }
583
584    #[test_traced]
585    fn test_message_too_large() {
586        // Configure test
587        let base_port = 3000;
588        let n: usize = 2;
589
590        // Initialize context
591        let executor = deterministic::Runner::seeded(0);
592        executor.start(|mut context| async move {
593            // Create peers
594            let mut peers_and_sks = Vec::new();
595            for i in 0..n {
596                let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
597                let peer_pk = peer_sk.public_key();
598                let peer_addr =
599                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
600                peers_and_sks.push((peer_sk, peer_pk, peer_addr));
601            }
602            let peers: Map<_, _> = peers_and_sks
603                .iter()
604                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
605                .try_collect()
606                .unwrap();
607
608            // Create network
609            let (sk, _, addr) = peers_and_sks[0].clone();
610            let config = Config::test(
611                sk,
612                addr,
613                1_024 * 1_024, // 1MB
614            );
615            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
616
617            // Register peers
618            oracle.update(0, peers.clone()).await;
619
620            // Register basic application
621            let (mut sender, _) =
622                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
623
624            // Wait to connect to all peers, and then send messages to everyone
625            network.start();
626
627            // Crate random message
628            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
629            context.fill_bytes(&mut msg[..]);
630
631            // Send message
632            let recipient = Recipients::One(peers[1].clone());
633            let result = sender.send(recipient, &msg[..], true).await;
634            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
635        });
636    }
637
638    #[test_traced]
639    fn test_rate_limiting() {
640        // Configure test
641        let base_port = 3000;
642        let n: usize = 2;
643
644        // Initialize context
645        let executor = deterministic::Runner::seeded(0);
646        executor.start(|context| async move {
647            // Create peers
648            let mut peers_and_sks = Vec::new();
649            for i in 0..n {
650                let sk = ed25519::PrivateKey::from_seed(i as u64);
651                let pk = sk.public_key();
652                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
653                peers_and_sks.push((sk, pk, addr));
654            }
655            let peers: Map<_, _> = peers_and_sks
656                .iter()
657                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
658                .try_collect()
659                .unwrap();
660            let (sk0, _, addr0) = peers_and_sks[0].clone();
661            let (sk1, pk1, addr1) = peers_and_sks[1].clone();
662
663            // Create network for peer 0
664            let config0 = Config::test(sk0, addr0, 1_024 * 1_024); // 1MB
665            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
666            oracle0.update(0, peers.clone()).await;
667            let (mut sender0, _receiver0) =
668                network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
669            network0.start();
670
671            // Create network for peer 1
672            let config1 = Config::test(sk1, addr1, 1_024 * 1_024); // 1MB
673            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
674            oracle1.update(0, peers.clone()).await;
675            let (_sender1, _receiver1) =
676                network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
677            network1.start();
678
679            // Send first message, which should be allowed and consume the quota.
680            let msg = vec![0u8; 1024]; // 1KB
681            loop {
682                // Confirm message is sent to peer
683                let sent = sender0
684                    .send(Recipients::One(pk1.clone()), &msg[..], true)
685                    .await
686                    .unwrap();
687                if !sent.is_empty() {
688                    break;
689                }
690
691                // Ensure we don't rate limit outbound sends while
692                // waiting for peers to connect
693                context.sleep(Duration::from_mins(1)).await
694            }
695
696            // Immediately send the second message to trigger the rate limit.
697            // With partial sends, rate-limited recipients return empty vec (not error).
698            // Outbound rate limiting skips the peer, returns empty vec.
699            let sent = sender0
700                .send(Recipients::One(pk1), &msg[..], true)
701                .await
702                .unwrap();
703            assert!(sent.is_empty());
704
705            // Give the metrics time to reflect the rate-limited message.
706            for _ in 0..10 {
707                assert_no_rate_limiting(&context);
708                context.sleep(Duration::from_millis(100)).await;
709            }
710        });
711    }
712
713    #[test_traced]
714    fn test_unordered_peer_sets() {
715        let (n, base_port) = (10, 3000);
716        let executor = deterministic::Runner::default();
717        executor.start(|context| async move {
718            // Create peers
719            let mut peers_and_sks = Vec::new();
720            for i in 0..n {
721                let sk = ed25519::PrivateKey::from_seed(i as u64);
722                let pk = sk.public_key();
723                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
724                peers_and_sks.push((sk, pk, addr));
725            }
726            let peer0 = peers_and_sks[0].clone();
727            let config = Config::test(peer0.0, peer0.2, 1_024 * 1_024);
728            let (network, mut oracle) = Network::new(context.with_label("network"), config);
729            network.start();
730
731            // Subscribe to peer sets
732            let mut subscription = oracle.subscribe().await;
733
734            // Register initial peer set
735            let set10: Map<_, _> = peers_and_sks
736                .iter()
737                .take(2)
738                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
739                .try_collect()
740                .unwrap();
741            oracle.update(10, set10.clone()).await;
742            let (id, new, all) = subscription.next().await.unwrap();
743            assert_eq!(id, 10);
744            assert_eq!(&new, set10.keys());
745            assert_eq!(&all, set10.keys());
746
747            // Register old peer sets (ignored)
748            let set9: Map<_, _> = peers_and_sks
749                .iter()
750                .skip(2)
751                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
752                .try_collect()
753                .unwrap();
754            oracle.update(9, set9.clone()).await;
755
756            // Add new peer set
757            let set11: Map<_, _> = peers_and_sks
758                .iter()
759                .skip(4)
760                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
761                .try_collect()
762                .unwrap();
763            oracle.update(11, set11.clone()).await;
764            let (id, new, all) = subscription.next().await.unwrap();
765            assert_eq!(id, 11);
766            assert_eq!(&new, set11.keys());
767            let all_keys: Set<_> = set10
768                .into_keys()
769                .into_iter()
770                .chain(set11.into_keys().into_iter())
771                .try_collect()
772                .unwrap();
773            assert_eq!(all, all_keys);
774        });
775    }
776
777    #[test_traced]
778    fn test_graceful_shutdown() {
779        let base_port = 3000;
780        let n: usize = 5;
781
782        let executor = deterministic::Runner::default();
783        executor.start(|context| async move {
784            // Create peers
785            let mut peers_and_sks = Vec::new();
786            for i in 0..n {
787                let sk = ed25519::PrivateKey::from_seed(i as u64);
788                let pk = sk.public_key();
789                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
790                peers_and_sks.push((sk, pk, addr));
791            }
792            let peers: Map<_, _> = peers_and_sks
793                .iter()
794                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
795                .try_collect()
796                .unwrap();
797
798            // Create networks for all peers
799            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
800            for (i, (sk, pk, addr)) in peers_and_sks.iter().enumerate() {
801                let peer_context = context.with_label(&format!("peer_{i}"));
802                let config = Config::test(sk.clone(), *addr, 1_024 * 1_024);
803                let (mut network, mut oracle) =
804                    Network::new(peer_context.with_label("network"), config);
805
806                // Register peer set
807                oracle.update(0, peers.clone()).await;
808
809                let (mut sender, mut receiver) =
810                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
811                network.start();
812
813                peer_context.with_label("agent").spawn({
814                    let mut complete_sender = complete_sender.clone();
815                    let pk = pk.clone();
816                    move |context| async move {
817                        // Wait to connect to at least one other peer
818                        let expected_connections = if i == 0 { n - 1 } else { 1 };
819
820                        // Send a message
821                        loop {
822                            let sent = sender
823                                .send(Recipients::All, pk.as_ref(), true)
824                                .await
825                                .unwrap();
826                            if sent.len() >= expected_connections {
827                                break;
828                            }
829                            context.sleep(Duration::from_millis(100)).await;
830                        }
831
832                        // Signal that this peer is connected
833                        complete_sender.send(()).await.unwrap();
834
835                        // Keep receiving messages until shutdown
836                        loop {
837                            select! {
838                                result = receiver.recv() => {
839                                    if result.is_err() {
840                                        // Channel closed due to shutdown
841                                        break;
842                                    }
843                                },
844                                _ = context.stopped() => {
845                                    // Graceful shutdown signal received
846                                    break;
847                                }
848                            }
849                        }
850                    }
851                });
852            }
853
854            // Wait for all peers to establish connectivity
855            for _ in 0..n {
856                complete_receiver.next().await.unwrap();
857            }
858
859            // Verify that network actors started for all peers
860            let metrics_before = context.encode();
861            let is_running = |name: &str| -> bool {
862                metrics_before.lines().any(|line| {
863                    line.starts_with("runtime_tasks_running{")
864                        && line.contains(&format!("name=\"{name}\""))
865                        && line.contains("kind=\"Task\"")
866                        && line.trim_end().ends_with(" 1")
867                })
868            };
869            for i in 0..n {
870                let prefix = format!("peer_{i}_network");
871                assert!(
872                    is_running(&format!("{prefix}_tracker")),
873                    "peer_{i} tracker should be running"
874                );
875                assert!(
876                    is_running(&format!("{prefix}_router")),
877                    "peer_{i} router should be running"
878                );
879                assert!(
880                    is_running(&format!("{prefix}_spawner")),
881                    "peer_{i} spawner should be running"
882                );
883                assert!(
884                    is_running(&format!("{prefix}_listener")),
885                    "peer_{i} listener should be running"
886                );
887                assert!(
888                    is_running(&format!("{prefix}_dialer")),
889                    "peer_{i} dialer should be running"
890                );
891            }
892
893            // All peers are connected - now trigger graceful shutdown
894            let shutdown_context = context.clone();
895            context.with_label("shutdown").spawn(move |_| async move {
896                // Trigger graceful shutdown
897                let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
898
899                // Shutdown should complete successfully without timeout
900                assert!(
901                    result.is_ok(),
902                    "graceful shutdown should complete: {result:?}"
903                );
904            });
905
906            // Wait for shutdown to complete
907            context.stopped().await.unwrap();
908
909            // Give the runtime a tick to process task completions and update metrics
910            context.sleep(Duration::from_millis(100)).await;
911
912            // Verify that all network actors stopped
913            let metrics_after = context.encode();
914            let is_stopped = |name: &str| -> bool {
915                metrics_after.lines().any(|line| {
916                    line.starts_with("runtime_tasks_running{")
917                        && line.contains(&format!("name=\"{name}\""))
918                        && line.contains("kind=\"Task\"")
919                        && line.trim_end().ends_with(" 0")
920                })
921            };
922            for i in 0..n {
923                let prefix = format!("peer_{i}_network");
924                assert!(
925                    is_stopped(&format!("{prefix}_tracker")),
926                    "peer_{i} tracker should be stopped"
927                );
928                assert!(
929                    is_stopped(&format!("{prefix}_router")),
930                    "peer_{i} router should be stopped"
931                );
932                assert!(
933                    is_stopped(&format!("{prefix}_spawner")),
934                    "peer_{i} spawner should be stopped"
935                );
936                assert!(
937                    is_stopped(&format!("{prefix}_listener")),
938                    "peer_{i} listener should be stopped"
939                );
940                assert!(
941                    is_stopped(&format!("{prefix}_dialer")),
942                    "peer_{i} dialer should be stopped"
943                );
944            }
945        });
946    }
947
948    #[test_traced]
949    fn test_subscription_includes_self_when_registered() {
950        let base_port = 3000;
951        let executor = deterministic::Runner::default();
952        executor.start(|context| async move {
953            // Create self (peer0) and other peers
954            let self_sk = ed25519::PrivateKey::from_seed(0);
955            let self_pk = self_sk.public_key();
956            let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
957
958            let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
959            let other_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
960
961            // Create network for peer0 (self)
962            let config = Config::test(self_sk, self_addr, 1_024 * 1_024);
963            let (network, mut oracle) = Network::new(context.with_label("network"), config);
964            network.start();
965
966            // Subscribe to peer sets
967            let mut subscription = oracle.subscribe().await;
968
969            // Register a peer set that does NOT include self
970            let peer_set: Map<_, _> = [(other_pk.clone(), other_addr.into())].try_into().unwrap();
971            oracle.update(1, peer_set.clone()).await;
972
973            // Receive subscription notification
974            let (id, new, all) = subscription.next().await.unwrap();
975            assert_eq!(id, 1);
976            assert_eq!(new.len(), 1);
977            assert_eq!(all.len(), 1);
978
979            // Self should NOT be in the new set
980            assert!(
981                new.position(&self_pk).is_none(),
982                "new set should not include self"
983            );
984            assert!(
985                new.position(&other_pk).is_some(),
986                "new set should include other"
987            );
988
989            // Self should NOT be in the tracked set (not registered)
990            assert!(
991                all.position(&self_pk).is_none(),
992                "tracked peers should not include self"
993            );
994            assert!(
995                all.position(&other_pk).is_some(),
996                "tracked peers should include other"
997            );
998
999            // Now register a peer set that DOES include self
1000            let peer_set: Map<_, _> = [
1001                (self_pk.clone(), self_addr.into()),
1002                (other_pk.clone(), other_addr.into()),
1003            ]
1004            .try_into()
1005            .unwrap();
1006            oracle.update(2, peer_set.clone()).await;
1007
1008            // Receive subscription notification
1009            let (id, new, all) = subscription.next().await.unwrap();
1010            assert_eq!(id, 2);
1011            assert_eq!(new.len(), 2);
1012            assert_eq!(all.len(), 2);
1013
1014            // Both peers should be in the new set
1015            assert!(
1016                new.position(&self_pk).is_some(),
1017                "new set should include self"
1018            );
1019            assert!(
1020                new.position(&other_pk).is_some(),
1021                "new set should include other"
1022            );
1023
1024            // Both peers should be in the tracked set
1025            assert!(
1026                all.position(&self_pk).is_some(),
1027                "tracked peers should include self"
1028            );
1029            assert!(
1030                all.position(&other_pk).is_some(),
1031                "tracked peers should include other"
1032            );
1033        });
1034    }
1035
1036    #[test_traced]
1037    fn test_dns_peer_addresses() {
1038        let base_port = 3200;
1039        let n: usize = 3;
1040
1041        let executor = deterministic::Runner::default();
1042        executor.start(|context| async move {
1043            // Create peers
1044            let mut peers_and_sks = Vec::new();
1045            for i in 0..n {
1046                let private_key = ed25519::PrivateKey::from_seed(i as u64);
1047                let public_key = private_key.public_key();
1048                let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1049                let host_str = format!("peer-{i}.local");
1050                let host = Hostname::new(&host_str).unwrap();
1051                peers_and_sks.push((private_key, public_key, socket, host_str, host));
1052            }
1053
1054            // Register DNS mappings for all peers
1055            for (_, _, socket, host_str, _) in &peers_and_sks {
1056                context.resolver_register(host_str.clone(), Some(vec![socket.ip()]));
1057            }
1058
1059            // Create peer addresses with DNS ingress
1060            let peers: Vec<(_, Address)> = peers_and_sks
1061                .iter()
1062                .map(|(_, pk, socket, _, host)| {
1063                    (
1064                        pk.clone(),
1065                        Address::Asymmetric {
1066                            ingress: Ingress::Dns {
1067                                host: host.clone(),
1068                                port: socket.port(),
1069                            },
1070                            egress: *socket,
1071                        },
1072                    )
1073                })
1074                .collect();
1075
1076            // Create networks
1077            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1078            for (i, (private_key, public_key, socket, _, _)) in peers_and_sks.iter().enumerate() {
1079                let context = context.with_label(&format!("peer_{i}"));
1080
1081                // Create network
1082                let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1083                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1084
1085                // Register peers with DNS addresses
1086                oracle.update(0, peers.clone().try_into().unwrap()).await;
1087
1088                // Register channel
1089                let (mut sender, mut receiver) =
1090                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1091
1092                network.start();
1093
1094                // Send/Receive messages
1095                let pk = public_key.clone();
1096                context.with_label("agent").spawn({
1097                    let mut complete_sender = complete_sender.clone();
1098                    let peers = peers.clone();
1099                    move |context| async move {
1100                        // Wait for messages from other peers
1101                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1102                            let mut received = HashSet::new();
1103                            while received.len() < n - 1 {
1104                                let (sender, message) = receiver.recv().await.unwrap();
1105                                assert_eq!(sender.as_ref(), message.as_ref());
1106                                received.insert(sender);
1107                            }
1108                            complete_sender.send(()).await.unwrap();
1109
1110                            loop {
1111                                receiver.recv().await.unwrap();
1112                            }
1113                        });
1114
1115                        // Send identity to all peers
1116                        let sender_task =
1117                            context
1118                                .with_label("sender")
1119                                .spawn(move |context| async move {
1120                                    loop {
1121                                        let mut recipients: Vec<_> = peers
1122                                            .iter()
1123                                            .filter(|(p, _)| p != &pk)
1124                                            .map(|(p, _)| p.clone())
1125                                            .collect();
1126                                        recipients.sort();
1127
1128                                        loop {
1129                                            let mut sent = sender
1130                                                .send(Recipients::All, pk.as_ref(), true)
1131                                                .await
1132                                                .unwrap();
1133                                            if sent.len() != n - 1 {
1134                                                context.sleep(Duration::from_millis(100)).await;
1135                                                continue;
1136                                            }
1137                                            sent.sort();
1138                                            assert_eq!(sent, recipients);
1139                                            break;
1140                                        }
1141
1142                                        context.sleep(Duration::from_secs(10)).await;
1143                                    }
1144                                });
1145
1146                        select! {
1147                            receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1148                            sender = sender_task => { panic!("sender exited: {sender:?}") },
1149                        }
1150                    }
1151                });
1152            }
1153
1154            // Wait for all peers to exchange messages
1155            for _ in 0..n {
1156                complete_receiver.next().await.unwrap();
1157            }
1158
1159            assert_no_rate_limiting(&context);
1160        });
1161    }
1162
1163    #[test_traced]
1164    fn test_mixed_socket_and_dns_addresses() {
1165        let base_port = 3300;
1166        let n: usize = 4;
1167
1168        let executor = deterministic::Runner::default();
1169        executor.start(|context| async move {
1170            // Create peers
1171            let mut peers_and_sks = Vec::new();
1172            for i in 0..n {
1173                let private_key = ed25519::PrivateKey::from_seed(i as u64);
1174                let public_key = private_key.public_key();
1175                let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1176                peers_and_sks.push((private_key, public_key, socket));
1177            }
1178
1179            // Register DNS mappings for peers 2 and 3 only
1180            for (i, (_, _, socket)) in peers_and_sks.iter().enumerate().skip(2) {
1181                context.resolver_register(format!("peer-{i}.local"), Some(vec![socket.ip()]));
1182            }
1183
1184            // Create peer addresses - peers 0,1 use Symmetric, peers 2,3 use DNS Asymmetric
1185            let peers: Vec<(_, Address)> = peers_and_sks
1186                .iter()
1187                .enumerate()
1188                .map(|(i, (_, pk, socket))| {
1189                    let addr = if i < 2 {
1190                        // Peers 0, 1: Symmetric socket address
1191                        Address::Symmetric(*socket)
1192                    } else {
1193                        // Peers 2, 3: Asymmetric with DNS ingress
1194                        Address::Asymmetric {
1195                            ingress: Ingress::Dns {
1196                                host: hostname!(&format!("peer-{i}.local")),
1197                                port: socket.port(),
1198                            },
1199                            egress: *socket,
1200                        }
1201                    };
1202                    (pk.clone(), addr)
1203                })
1204                .collect();
1205
1206            // Create networks
1207            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1208            for (i, (private_key, public_key, socket)) in peers_and_sks.iter().enumerate() {
1209                let context = context.with_label(&format!("peer_{i}"));
1210
1211                // Create network
1212                let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1213                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1214
1215                // Register peers with mixed addresses
1216                oracle.update(0, peers.clone().try_into().unwrap()).await;
1217
1218                // Register channel
1219                let (mut sender, mut receiver) =
1220                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1221
1222                network.start();
1223
1224                // Send/Receive messages
1225                let pk = public_key.clone();
1226                context.with_label("agent").spawn({
1227                    let mut complete_sender = complete_sender.clone();
1228                    let peers = peers.clone();
1229                    move |context| async move {
1230                        // Wait for messages from other peers
1231                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1232                            let mut received = HashSet::new();
1233                            while received.len() < n - 1 {
1234                                let (sender, message) = receiver.recv().await.unwrap();
1235                                assert_eq!(sender.as_ref(), message.as_ref());
1236                                received.insert(sender);
1237                            }
1238                            complete_sender.send(()).await.unwrap();
1239
1240                            loop {
1241                                receiver.recv().await.unwrap();
1242                            }
1243                        });
1244
1245                        // Send identity to all peers
1246                        let sender_task =
1247                            context
1248                                .with_label("sender")
1249                                .spawn(move |context| async move {
1250                                    loop {
1251                                        let mut recipients: Vec<_> = peers
1252                                            .iter()
1253                                            .filter(|(p, _)| p != &pk)
1254                                            .map(|(p, _)| p.clone())
1255                                            .collect();
1256                                        recipients.sort();
1257
1258                                        loop {
1259                                            let mut sent = sender
1260                                                .send(Recipients::All, pk.as_ref(), true)
1261                                                .await
1262                                                .unwrap();
1263                                            if sent.len() != n - 1 {
1264                                                context.sleep(Duration::from_millis(100)).await;
1265                                                continue;
1266                                            }
1267                                            sent.sort();
1268                                            assert_eq!(sent, recipients);
1269                                            break;
1270                                        }
1271
1272                                        context.sleep(Duration::from_secs(10)).await;
1273                                    }
1274                                });
1275
1276                        select! {
1277                            receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1278                            sender = sender_task => { panic!("sender exited: {sender:?}") },
1279                        }
1280                    }
1281                });
1282            }
1283
1284            // Wait for all peers to exchange messages
1285            for _ in 0..n {
1286                complete_receiver.next().await.unwrap();
1287            }
1288
1289            assert_no_rate_limiting(&context);
1290        });
1291    }
1292
1293    #[test_traced]
1294    fn test_dns_resolving_to_private_ip_not_dialed() {
1295        // Test that when allow_private_ips=false, DNS addresses that resolve
1296        // to private IPs are not dialed.
1297        let base_port = 4400;
1298        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1299        executor.start(|context| async move {
1300            let peer0 = ed25519::PrivateKey::from_seed(0);
1301            let peer1 = ed25519::PrivateKey::from_seed(1);
1302
1303            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1304            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1305
1306            // Register DNS mapping that resolves to localhost (private IP)
1307            context.resolver_register("peer-0.local".to_string(), Some(vec![socket0.ip()]));
1308
1309            // Create peer 0 with allow_private_ips=true
1310            let mut config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1311            config0.allow_private_ips = true;
1312            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1313
1314            // Peer 0 knows about peer 1 with a socket address
1315            let peers0: Vec<(_, Address)> = vec![
1316                (peer0.public_key(), Address::Symmetric(socket0)),
1317                (peer1.public_key(), Address::Symmetric(socket1)),
1318            ];
1319            oracle0.update(0, peers0.try_into().unwrap()).await;
1320
1321            let (_sender0, mut receiver0) =
1322                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1323            network0.start();
1324
1325            // Create peer 1 with allow_private_ips=false
1326            let mut config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1327            config1.allow_private_ips = false; // This should prevent dialing the private IP
1328            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1329
1330            // Peer 1 knows about peer 0 with a DNS address that resolves to private IP
1331            let peers1: Vec<(_, Address)> = vec![
1332                (
1333                    peer0.public_key(),
1334                    Address::Asymmetric {
1335                        ingress: Ingress::Dns {
1336                            host: hostname!("peer-0.local"),
1337                            port: socket0.port(),
1338                        },
1339                        egress: socket0,
1340                    },
1341                ),
1342                (peer1.public_key(), Address::Symmetric(socket1)),
1343            ];
1344            oracle1.update(0, peers1.try_into().unwrap()).await;
1345
1346            let (mut sender1, _receiver1) =
1347                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1348            network1.start();
1349
1350            // Wait for a period during which peer 1 would normally connect
1351            context.sleep(Duration::from_secs(5)).await;
1352
1353            // Try to send from peer 1 - should not reach anyone since private IPs are blocked
1354            let sent = sender1
1355                .send(Recipients::All, peer1.public_key().as_ref(), true)
1356                .await
1357                .unwrap();
1358            assert!(
1359                sent.is_empty(),
1360                "peer 1 should not have connected to peer 0 (private IP)"
1361            );
1362
1363            // Verify peer 0 received nothing from peer 1
1364            select! {
1365                msg = receiver0.recv() => {
1366                    panic!("peer 0 should not have received any message, got: {msg:?}");
1367                },
1368                _ = context.sleep(Duration::from_secs(1)) => {
1369                    // Expected: timeout with no message
1370                }
1371            }
1372        });
1373    }
1374
1375    #[test_traced]
1376    fn test_dns_mixed_ips_connectivity() {
1377        // Test that peers can connect even when DNS resolves to multiple IPs
1378        // where most are unreachable. The dialer randomly picks an IP, so
1379        // eventually it should pick the reachable one.
1380        //
1381        // Run over 25 seeds to ensure we exercise the random IP selection.
1382        for seed in 0..25 {
1383            let base_port = 3500;
1384
1385            let cfg = deterministic::Config::default()
1386                .with_seed(seed)
1387                .with_timeout(Some(Duration::from_secs(120)));
1388            let executor = deterministic::Runner::new(cfg);
1389            executor.start(|context| async move {
1390                let peer0 = ed25519::PrivateKey::from_seed(0);
1391                let peer1 = ed25519::PrivateKey::from_seed(1);
1392
1393                let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1394                let socket0 = SocketAddr::new(good_ip, base_port);
1395                let socket1 = SocketAddr::new(good_ip, base_port + 1);
1396
1397                // Register DNS mappings with 3 bad IPs and 1 good IP for both peers
1398                let mut all_ips0: Vec<IpAddr> = (1..=3)
1399                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1400                    .collect();
1401                all_ips0.push(good_ip);
1402                context.resolver_register("peer-0.local", Some(all_ips0));
1403
1404                let mut all_ips1: Vec<IpAddr> = (1..=3)
1405                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1406                    .collect();
1407                all_ips1.push(good_ip);
1408                context.resolver_register("peer-1.local", Some(all_ips1));
1409
1410                // Create peer addresses - both peers use DNS with mixed IPs
1411                let peers: Vec<(_, Address)> = vec![
1412                    (
1413                        peer0.public_key(),
1414                        Address::Asymmetric {
1415                            ingress: Ingress::Dns {
1416                                host: hostname!("peer-0.local"),
1417                                port: base_port,
1418                            },
1419                            egress: socket0,
1420                        },
1421                    ),
1422                    (
1423                        peer1.public_key(),
1424                        Address::Asymmetric {
1425                            ingress: Ingress::Dns {
1426                                host: hostname!("peer-1.local"),
1427                                port: base_port + 1,
1428                            },
1429                            egress: socket1,
1430                        },
1431                    ),
1432                ];
1433
1434                // Create peer 0
1435                let config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1436                let (mut network0, mut oracle0) =
1437                    Network::new(context.with_label("peer_0"), config0);
1438                oracle0.update(0, peers.clone().try_into().unwrap()).await;
1439                let (_sender0, mut receiver0) =
1440                    network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1441                network0.start();
1442
1443                // Create peer 1
1444                let config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1445                let (mut network1, mut oracle1) =
1446                    Network::new(context.with_label("peer_1"), config1);
1447                oracle1.update(0, peers.clone().try_into().unwrap()).await;
1448                let (mut sender1, _receiver1) =
1449                    network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1450                network1.start();
1451
1452                // Wait for peers to connect (may take multiple attempts due to random IP selection)
1453                let pk0 = peer0.public_key();
1454                loop {
1455                    let sent = sender1
1456                        .send(
1457                            Recipients::One(pk0.clone()),
1458                            peer1.public_key().as_ref(),
1459                            true,
1460                        )
1461                        .await
1462                        .unwrap();
1463                    if !sent.is_empty() {
1464                        break;
1465                    }
1466                    context.sleep(Duration::from_millis(100)).await;
1467                }
1468
1469                // Verify peer 0 received the message
1470                let (sender, msg) = receiver0.recv().await.unwrap();
1471                assert_eq!(sender, peer1.public_key());
1472                assert_eq!(msg.as_ref(), peer1.public_key().as_ref());
1473            });
1474        }
1475    }
1476
1477    #[test_traced]
1478    fn test_many_peer_restart_with_new_address() {
1479        let base_port = 9500;
1480        let n = 5;
1481
1482        let executor = deterministic::Runner::default();
1483        executor.start(|context| async move {
1484            // Create peers
1485            let peers: Vec<_> = (0..n)
1486                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1487                .collect();
1488            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1489
1490            // Track senders/receivers/oracles/handles across restarts
1491            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1492            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1493            let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1494            let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1495                (0..n).map(|_| None).collect();
1496
1497            // Track port allocations (updated on restart)
1498            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1499
1500            // Create initial peer set with addresses
1501            let peer_set: Vec<(_, Address)> = addresses
1502                .iter()
1503                .enumerate()
1504                .map(|(i, pk)| {
1505                    (
1506                        pk.clone(),
1507                        Address::Symmetric(SocketAddr::new(
1508                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1509                            ports[i],
1510                        )),
1511                    )
1512                })
1513                .collect();
1514
1515            // Create networks for all peers
1516            for (i, peer) in peers.iter().enumerate() {
1517                let peer_context = context.with_label(&format!("peer_{i}"));
1518
1519                let config = Config::test(
1520                    peer.clone(),
1521                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1522                    MAX_MESSAGE_SIZE,
1523                );
1524                let (mut network, mut oracle) =
1525                    Network::new(peer_context.with_label("network"), config);
1526
1527                // Register peer set
1528                oracle.update(0, peer_set.clone().try_into().unwrap()).await;
1529
1530                let (sender, receiver) =
1531                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1532                senders[i] = Some(sender);
1533                receivers[i] = Some(receiver);
1534                oracles[i] = Some(oracle);
1535
1536                let handle = network.start();
1537                handles[i] = Some(handle);
1538            }
1539
1540            // Wait for full connectivity (each peer can send to all others)
1541            for (i, sender) in senders.iter_mut().enumerate() {
1542                let sender = sender.as_mut().unwrap();
1543                loop {
1544                    let sent = sender
1545                        .send(Recipients::All, peers[i].public_key().as_ref(), true)
1546                        .await
1547                        .unwrap();
1548                    if sent.len() == n - 1 {
1549                        break;
1550                    }
1551                    context.sleep(Duration::from_millis(100)).await;
1552                }
1553            }
1554
1555            // Verify each peer can receive from all others
1556            for receiver in receivers.iter_mut() {
1557                let receiver = receiver.as_mut().unwrap();
1558                let mut received = HashSet::new();
1559                while received.len() < n - 1 {
1560                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1561                    assert_eq!(sender.as_ref(), message.as_ref());
1562                    received.insert(sender);
1563                }
1564            }
1565
1566            // Restart each non-first peer with a new port, multiple rounds
1567            let mut restart_counter = 0u16;
1568            for round in 0..3 {
1569                for restart_peer_idx in 1..n {
1570                    // Allocate a new unique port
1571                    restart_counter += 1;
1572                    let new_port = base_port + 100 + restart_counter;
1573                    ports[restart_peer_idx] = new_port;
1574
1575                    // Abort the peer's network
1576                    if let Some(handle) = handles[restart_peer_idx].take() {
1577                        handle.abort();
1578                    }
1579                    senders[restart_peer_idx] = None;
1580                    receivers[restart_peer_idx] = None;
1581                    oracles[restart_peer_idx] = None;
1582
1583                    // Create updated peer set with new port
1584                    let updated_peer_set: Vec<(_, Address)> = addresses
1585                        .iter()
1586                        .enumerate()
1587                        .map(|(i, pk)| {
1588                            (
1589                                pk.clone(),
1590                                Address::Symmetric(SocketAddr::new(
1591                                    IpAddr::V4(Ipv4Addr::LOCALHOST),
1592                                    ports[i],
1593                                )),
1594                            )
1595                        })
1596                        .collect();
1597
1598                    // Update oracle on all running peers
1599                    for oracle in oracles.iter_mut().flatten() {
1600                        oracle
1601                            .update(
1602                                (round * (n - 1) + restart_peer_idx) as u64,
1603                                updated_peer_set.clone().try_into().unwrap(),
1604                            )
1605                            .await;
1606                    }
1607
1608                    // Restart the peer with new port
1609                    let peer_context =
1610                        context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1611                    let config = Config::test(
1612                        peers[restart_peer_idx].clone(),
1613                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1614                        MAX_MESSAGE_SIZE,
1615                    );
1616                    let (mut network, mut oracle) =
1617                        Network::new(peer_context.with_label("network"), config);
1618
1619                    oracle
1620                        .update(
1621                            (round * (n - 1) + restart_peer_idx) as u64,
1622                            updated_peer_set.clone().try_into().unwrap(),
1623                        )
1624                        .await;
1625
1626                    let (sender, receiver) = network.register(
1627                        0,
1628                        Quota::per_second(NZU32!(100)),
1629                        DEFAULT_MESSAGE_BACKLOG,
1630                    );
1631                    senders[restart_peer_idx] = Some(sender);
1632                    receivers[restart_peer_idx] = Some(receiver);
1633                    oracles[restart_peer_idx] = Some(oracle);
1634
1635                    let handle = network.start();
1636                    handles[restart_peer_idx] = Some(handle);
1637
1638                    // Wait for the restarted peer to reconnect to all others
1639                    let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1640                    loop {
1641                        let sent = restarted_sender
1642                            .send(
1643                                Recipients::All,
1644                                peers[restart_peer_idx].public_key().as_ref(),
1645                                true,
1646                            )
1647                            .await
1648                            .unwrap();
1649                        if sent.len() == n - 1 {
1650                            break;
1651                        }
1652                        context.sleep(Duration::from_millis(100)).await;
1653                    }
1654
1655                    // Verify all other peers can send to the restarted peer
1656                    for i in 0..n {
1657                        if i == restart_peer_idx {
1658                            continue;
1659                        }
1660                        let sender = senders[i].as_mut().unwrap();
1661                        loop {
1662                            let sent = sender
1663                                .send(
1664                                    Recipients::One(addresses[restart_peer_idx].clone()),
1665                                    peers[i].public_key().as_ref(),
1666                                    true,
1667                                )
1668                                .await
1669                                .unwrap();
1670                            if sent.len() == 1 {
1671                                break;
1672                            }
1673                            context.sleep(Duration::from_millis(100)).await;
1674                        }
1675                    }
1676
1677                    // Verify the restarted peer receives from all others
1678                    let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1679                    let mut received = HashSet::new();
1680                    while received.len() < n - 1 {
1681                        let (sender, message): (ed25519::PublicKey, _) =
1682                            restarted_receiver.recv().await.unwrap();
1683                        assert_eq!(sender.as_ref(), message.as_ref());
1684                        received.insert(sender);
1685                    }
1686                }
1687            }
1688
1689            assert_no_rate_limiting(&context);
1690        });
1691    }
1692
1693    #[test_traced]
1694    fn test_simultaneous_peer_restart() {
1695        let base_port = 9700;
1696        let n = 5;
1697
1698        let executor = deterministic::Runner::default();
1699        executor.start(|context| async move {
1700            // Create peers
1701            let peers: Vec<_> = (0..n)
1702                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1703                .collect();
1704            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1705
1706            // Track port allocations (updated on restart)
1707            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1708
1709            // Create initial peer set with addresses
1710            let peer_set: Vec<(_, Address)> = addresses
1711                .iter()
1712                .enumerate()
1713                .map(|(i, pk)| {
1714                    (
1715                        pk.clone(),
1716                        Address::Symmetric(SocketAddr::new(
1717                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1718                            ports[i],
1719                        )),
1720                    )
1721                })
1722                .collect();
1723
1724            // Track senders/receivers/oracles/handles across restarts
1725            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1726            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1727            let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1728            let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1729                (0..n).map(|_| None).collect();
1730
1731            // Create networks for all peers
1732            for (i, peer) in peers.iter().enumerate() {
1733                let peer_context = context.with_label(&format!("peer_{i}"));
1734
1735                let config = Config::test(
1736                    peer.clone(),
1737                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1738                    MAX_MESSAGE_SIZE,
1739                );
1740                let (mut network, mut oracle) =
1741                    Network::new(peer_context.with_label("network"), config);
1742
1743                // Register peer set
1744                oracle.update(0, peer_set.clone().try_into().unwrap()).await;
1745
1746                let (sender, receiver) =
1747                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1748                senders[i] = Some(sender);
1749                receivers[i] = Some(receiver);
1750                oracles[i] = Some(oracle);
1751
1752                let handle = network.start();
1753                handles[i] = Some(handle);
1754            }
1755
1756            // Wait for full connectivity
1757            for (i, sender) in senders.iter_mut().enumerate() {
1758                let sender = sender.as_mut().unwrap();
1759                loop {
1760                    let sent = sender
1761                        .send(Recipients::All, peers[i].public_key().as_ref(), true)
1762                        .await
1763                        .unwrap();
1764                    if sent.len() == n - 1 {
1765                        break;
1766                    }
1767                    context.sleep(Duration::from_millis(100)).await;
1768                }
1769            }
1770
1771            // Verify each peer can receive from all others
1772            for receiver in receivers.iter_mut() {
1773                let receiver = receiver.as_mut().unwrap();
1774                let mut received = HashSet::new();
1775                while received.len() < n - 1 {
1776                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1777                    assert_eq!(sender.as_ref(), message.as_ref());
1778                    received.insert(sender);
1779                }
1780            }
1781
1782            // Shutdown all peers except peer 0 simultaneously.
1783            //
1784            // We keep peer 0 alive to exercise the case where multiple
1785            // peers churn at once.
1786            let restart_peers: Vec<usize> = (1..n).collect();
1787            for &idx in &restart_peers {
1788                if let Some(handle) = handles[idx].take() {
1789                    handle.abort();
1790                }
1791                senders[idx] = None;
1792                receivers[idx] = None;
1793                oracles[idx] = None;
1794                ports[idx] = base_port + 100 + idx as u16;
1795            }
1796
1797            // Wait for connections to be detected as closed
1798            context.sleep(Duration::from_secs(2)).await;
1799
1800            // Create updated peer set with new ports
1801            let updated_peer_set: Vec<(_, Address)> = addresses
1802                .iter()
1803                .enumerate()
1804                .map(|(i, pk)| {
1805                    (
1806                        pk.clone(),
1807                        Address::Symmetric(SocketAddr::new(
1808                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1809                            ports[i],
1810                        )),
1811                    )
1812                })
1813                .collect();
1814
1815            // Update oracle on peer 0 (the only running peer)
1816            oracles[0]
1817                .as_mut()
1818                .unwrap()
1819                .update(1, updated_peer_set.clone().try_into().unwrap())
1820                .await;
1821
1822            // Restart all shutdown peers with new ports
1823            for &idx in &restart_peers {
1824                let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1825                let config = Config::test(
1826                    peers[idx].clone(),
1827                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1828                    MAX_MESSAGE_SIZE,
1829                );
1830                let (mut network, mut oracle) =
1831                    Network::new(peer_context.with_label("network"), config);
1832
1833                oracle
1834                    .update(1, updated_peer_set.clone().try_into().unwrap())
1835                    .await;
1836
1837                let (sender, receiver) =
1838                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1839                senders[idx] = Some(sender);
1840                receivers[idx] = Some(receiver);
1841                oracles[idx] = Some(oracle);
1842
1843                let handle = network.start();
1844                handles[idx] = Some(handle);
1845            }
1846
1847            // Wait for full connectivity after restart
1848            for (i, sender) in senders.iter_mut().enumerate() {
1849                let sender = sender.as_mut().unwrap();
1850                loop {
1851                    let sent = sender
1852                        .send(Recipients::All, peers[i].public_key().as_ref(), true)
1853                        .await
1854                        .unwrap();
1855                    if sent.len() == n - 1 {
1856                        break;
1857                    }
1858                    context.sleep(Duration::from_millis(100)).await;
1859                }
1860            }
1861
1862            // Verify all peers can receive from all others after restart
1863            for receiver in receivers.iter_mut() {
1864                let receiver = receiver.as_mut().unwrap();
1865                let mut received = HashSet::new();
1866                while received.len() < n - 1 {
1867                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1868                    assert_eq!(sender.as_ref(), message.as_ref());
1869                    received.insert(sender);
1870                }
1871            }
1872
1873            assert_no_rate_limiting(&context);
1874        });
1875    }
1876    #[test_traced]
1877    fn test_operations_after_shutdown_do_not_panic() {
1878        let executor = deterministic::Runner::default();
1879        executor.start(|context| async move {
1880            let peer = ed25519::PrivateKey::from_seed(0);
1881            let address = peer.public_key();
1882
1883            let peer_context = context.with_label("peer");
1884            let config = Config::test(
1885                peer.clone(),
1886                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1887                MAX_MESSAGE_SIZE,
1888            );
1889            let (mut network, mut oracle) =
1890                Network::new(peer_context.with_label("network"), config);
1891
1892            // Register channel and peer set
1893            let (mut sender, _receiver) =
1894                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1895            let peer_addr =
1896                Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1897            let peers: Map<ed25519::PublicKey, Address> =
1898                vec![(address.clone(), peer_addr)].try_into().unwrap();
1899            oracle.update(0, peers.clone()).await;
1900
1901            // Start and immediately abort the network
1902            let handle = network.start();
1903            handle.abort();
1904
1905            // Wait for shutdown to propagate
1906            context.sleep(Duration::from_millis(100)).await;
1907
1908            // Oracle operations should not panic even after shutdown
1909            oracle.update(1, peers.clone()).await;
1910            let _ = oracle.peer_set(0).await;
1911            let _ = oracle.subscribe().await;
1912            oracle.block(address.clone()).await;
1913
1914            // Sender operations should not panic even after shutdown
1915            let sent = sender
1916                .send(Recipients::All, address.as_ref(), true)
1917                .await
1918                .unwrap();
1919            assert!(sent.is_empty());
1920        });
1921    }
1922
1923    fn clean_shutdown(seed: u64) {
1924        let cfg = deterministic::Config::default()
1925            .with_seed(seed)
1926            .with_timeout(Some(Duration::from_secs(30)));
1927        let executor = deterministic::Runner::new(cfg);
1928        executor.start(|context| async move {
1929            let peer = ed25519::PrivateKey::from_seed(0);
1930
1931            let peer_context = context.with_label("peer");
1932            let config = Config::test(
1933                peer.clone(),
1934                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1935                MAX_MESSAGE_SIZE,
1936            );
1937            let (mut network, mut oracle) =
1938                Network::new(peer_context.with_label("network"), config);
1939
1940            // Register channel and peer set
1941            let (_, _) =
1942                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1943            let peer_addr =
1944                Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1945            let peers: Map<ed25519::PublicKey, Address> =
1946                vec![(peer.public_key(), peer_addr)].try_into().unwrap();
1947            oracle.update(0, peers).await;
1948
1949            // Start the network
1950            let handle = network.start();
1951
1952            // Allow tasks to start
1953            context.sleep(Duration::from_millis(100)).await;
1954
1955            // Count running tasks under the network prefix
1956            let running_before = count_running_tasks(&context, "peer_network");
1957            assert!(
1958                running_before > 0,
1959                "at least one network task should be running"
1960            );
1961
1962            // Abort the network
1963            handle.abort();
1964            let _ = handle.await;
1965
1966            // Give the runtime a tick to process aborts
1967            context.sleep(Duration::from_millis(100)).await;
1968
1969            // Verify all network tasks are stopped
1970            let running_after = count_running_tasks(&context, "peer_network");
1971            assert_eq!(
1972                running_after, 0,
1973                "all network tasks should be stopped, but {running_after} still running"
1974            );
1975        });
1976    }
1977
1978    #[test_traced]
1979    fn test_clean_shutdown() {
1980        for seed in 0..25 {
1981            clean_shutdown(seed);
1982        }
1983    }
1984}