Skip to main content

commonware_p2p/authenticated/lookup/
mod.rs

1//! Communicate with a fixed set of authenticated peers with known addresses over encrypted connections.
2//!
3//! `lookup` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Unlike `discovery`, peers in `lookup` don't use a discovery mechanism to find each other;
6//! each peer's address is supplied by the application layer.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
12//!
13//! # Design
14//!
15//! ## Discovery
16//!
17//! This module operates under the assumption that all peers are aware of and synchronized on
18//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
19//! list of peer `PublicKey`/`SocketAddr` pairs (`(u64, Vec<(PublicKey, SocketAddr)>)`).
20//!
21//! On startup, the application supplies the initial set of peers. The [`Oracle`] implements
22//! [`AddressableManager`](crate::AddressableManager) which provides two ways to update peer addresses:
23//!
24//! - [`AddressableManager::track`](crate::AddressableManager::track): Register a new peer set at a
25//!   monotonically increasing index. Use this when the peer set composition changes (peers added/removed).
26//! - [`AddressableManager::overwrite`](crate::AddressableManager::overwrite): Update multiple
27//!   peers' addresses in-place without creating a new peer set. Use this when only peer IPs change but
28//!   the peer set composition stays the same. Untracked or unchanged peers are silently skipped (so the application doesn't
29//!   need to track what their last submitted peer set was).
30//!
31//! Any inbound connection attempts from an IP address that is not in the union of all registered
32//! peer sets will be rejected.
33//!
34//! ## Messages
35//!
36//! Application-level data is exchanged using the `Data` message type. This structure contains:
37//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
38//! - `message`: The arbitrary application payload as `IoBuf`.
39//!
40//! The size of the `message` bytes must not exceed the configured
41//! `max_message_size`. If it does, the sending operation will fail with
42//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
43//! communications to potentially bypass lower-priority messages waiting in send queues across all
44//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
45//! and rate limiting.
46//!
47//! ## Compression
48//!
49//! Stream compression is not provided at the transport layer to avoid inadvertently
50//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
51//! between compression and encryption by analyzing patterns in the resulting data.
52//! By compressing secrets alongside attacker-controlled content, these attacks can infer
53//! sensitive information through compression ratio analysis. Applications that choose
54//! to compress data should do so with full awareness of these risks and implement
55//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
56//! alongside sensitive information).
57//!
58//! ## Rate Limiting
59//!
60//! There are five primary rate limits:
61//!
62//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
63//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
64//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
65//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
66//! - `rate` (per channel): The rate limit for messages sent on a single channel.
67//!
68//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
69//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
70//!
71//! ## Message Delivery
72//!
73//! Outgoing messages are dropped when a peer's send buffer is full, preventing slow peers
74//! from blocking sends to other peers. Incoming messages are dropped when the application's
75//! receive buffer is full, ensuring ping messages continue to flow and connections remain
76//! healthy.
77//!
78//! # Example
79//!
80//! ```rust
81//! use commonware_p2p::{authenticated::lookup::{self, Network}, Address, AddressableManager, Sender, Recipients};
82//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, };
83//! use commonware_runtime::{deterministic, IoBuf, Metrics, Quota, Runner, Spawner};
84//! use commonware_utils::{NZU32, ordered::Map};
85//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
86//!
87//! // Configure context
88//! let runtime_cfg = deterministic::Config::default();
89//! let runner = deterministic::Runner::new(runtime_cfg);
90//!
91//! // Generate identity
92//! //
93//! // In production, the signer should be generated from a secure source of entropy.
94//! let my_sk = ed25519::PrivateKey::from_seed(0);
95//! let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
96//!
97//! // Generate peers
98//! //
99//! // In production, peer identities will be provided by some external source of truth
100//! // (like the staking set of a blockchain).
101//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
102//! let peer1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001);
103//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
104//! let peer2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002);
105//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
106//! let peer3_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3003);
107//!
108//! // Configure namespace
109//! //
110//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
111//! let application_namespace = b"my-app-namespace";
112//!
113//! // Configure network
114//! //
115//! // In production, use a more conservative configuration like `Config::recommended`.
116//! const MAX_MESSAGE_SIZE: u32 = 1_024; // 1KB
117//! let p2p_cfg = lookup::Config::local(
118//!     my_sk.clone(),
119//!     application_namespace,
120//!     my_addr,
121//!     MAX_MESSAGE_SIZE,
122//! );
123//!
124//! // Start context
125//! runner.start(|context| async move {
126//!     // Initialize network
127//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
128//!
129//!     // Register authorized peers
130//!     //
131//!     // In production, this would be updated as new peer sets are created (like when
132//!     // the composition of a validator set changes).
133//!     let peers: Map<_, Address> = [
134//!         (my_sk.public_key(), my_addr.into()),
135//!         (peer1, peer1_addr.into()),
136//!         (peer2, peer2_addr.into()),
137//!         (peer3, peer3_addr.into()),
138//!     ].try_into().unwrap();
139//!     oracle.track(0, peers).await;
140//!
141//!     // Register some channel
142//!     const MAX_MESSAGE_BACKLOG: usize = 128;
143//!     let (mut sender, receiver) = network.register(
144//!         0,
145//!         Quota::per_second(NZU32!(1)),
146//!         MAX_MESSAGE_BACKLOG,
147//!     );
148//!
149//!     // Run network
150//!     network.start();
151//!
152//!     // Example: Use sender
153//!     let _ = sender.send(Recipients::All, IoBuf::from(b"hello"), false).await;
154//!
155//!     // Graceful shutdown (stops all spawned tasks)
156//!     context.stop(0, None).await.unwrap();
157//! });
158//! ```
159
160mod actors;
161mod channels;
162mod config;
163mod metrics;
164mod network;
165mod types;
166
167use thiserror::Error;
168
169/// Errors that can occur when interacting with the network.
170#[derive(Error, Debug)]
171pub enum Error {
172    #[error("message too large: {0}")]
173    MessageTooLarge(usize),
174    #[error("network closed")]
175    NetworkClosed,
176}
177
178pub use actors::tracker::Oracle;
179pub use channels::{Receiver, Sender};
180pub use config::Config;
181pub use network::Network;
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use crate::{
187        Address, AddressableManager, Blocker, Ingress, Provider, Receiver, Recipients, Sender,
188    };
189    use commonware_cryptography::{ed25519, Signer as _};
190    use commonware_macros::{select, test_group, test_traced};
191    use commonware_runtime::{
192        count_running_tasks, deterministic, tokio, BufferPooler, Clock, Metrics,
193        Network as RNetwork, Quota, Resolver, Runner, Spawner,
194    };
195    use commonware_utils::{
196        channel::mpsc,
197        hostname,
198        ordered::{Map, Set},
199        Hostname, TryCollect, NZU32,
200    };
201    use rand_core::{CryptoRngCore, RngCore};
202    use std::{
203        collections::HashSet,
204        net::{IpAddr, Ipv4Addr, SocketAddr},
205        time::Duration,
206    };
207
208    #[derive(Copy, Clone)]
209    enum Mode {
210        All,
211        Some,
212        One,
213    }
214
215    const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; // 1MB
216    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
217
218    /// Ensure no message rate limiting occurred.
219    ///
220    /// If a message is rate limited, it would be formatted as:
221    ///
222    /// ```text
223    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
224    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
225    /// ```
226    fn assert_no_rate_limiting(context: &impl Metrics) {
227        let metrics = context.encode();
228        assert!(
229            !metrics.contains("messages_rate_limited_total{"),
230            "no messages should be rate limited: {metrics}"
231        );
232    }
233
234    /// Test connectivity between `n` peers.
235    ///
236    /// We set a unique `base_port` for each test to avoid "address already in use"
237    /// errors when tests are run immediately after each other.
238    async fn run_network(
239        context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
240        max_message_size: u32,
241        base_port: u16,
242        n: usize,
243        mode: Mode,
244    ) {
245        // Create peers
246        let mut peers_and_sks = Vec::new();
247        for i in 0..n {
248            let private_key = ed25519::PrivateKey::from_seed(i as u64);
249            let public_key = private_key.public_key();
250            let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
251            peers_and_sks.push((private_key, public_key, address));
252        }
253        let peers = peers_and_sks
254            .iter()
255            .map(|(_, pub_key, addr)| (pub_key.clone(), (*addr).into()))
256            .collect::<Vec<_>>();
257
258        // Create networks
259        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
260        for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
261            let public_key = public_key.clone();
262
263            // Create peer context
264            let context = context.with_label(&format!("peer_{i}"));
265
266            // Create network
267            let config = Config::test(private_key.clone(), *address, max_message_size);
268            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
269
270            // Register peers
271            oracle.track(0, peers.clone().try_into().unwrap()).await;
272
273            // Register basic application
274            let (mut sender, mut receiver) =
275                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
276
277            // Wait to connect to all peers, and then send messages to everyone
278            network.start();
279
280            // Send/Receive messages
281            context.with_label("agent").spawn({
282                let complete_sender = complete_sender.clone();
283                let peers = peers.clone();
284                move |context| async move {
285                    // Wait for all peers to send their identity
286                    let receiver = context.with_label("receiver").spawn(move |_| async move {
287                        // Wait for all peers to send their identity
288                        let mut received = HashSet::new();
289                        while received.len() < n - 1 {
290                            // Ensure message equals sender identity
291                            let (sender, message) = receiver.recv().await.unwrap();
292                            assert_eq!(message, sender.as_ref());
293
294                            // Add to received set
295                            received.insert(sender);
296                        }
297                        complete_sender.send(()).await.unwrap();
298
299                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
300                        loop {
301                            receiver.recv().await.unwrap();
302                        }
303                    });
304
305                    // Send identity to all peers
306                    let sender = context
307                        .with_label("sender")
308                        .spawn(move |context| async move {
309                            // Get all peers not including self
310                            let mut recipients: Vec<_> = peers
311                                .iter()
312                                .enumerate()
313                                .filter(|(j, _)| i != *j)
314                                .map(|(_, (pk, _))| pk.clone())
315                                .collect();
316                            recipients.sort();
317
318                            // Loop forever to account for unexpected message drops
319                            loop {
320                                match mode {
321                                    Mode::One => {
322                                        for pub_key in &recipients {
323                                            // Loop until success
324                                            loop {
325                                                let sent = sender
326                                                    .send(
327                                                        Recipients::One(pub_key.clone()),
328                                                        public_key.as_ref().to_vec(),
329                                                        true,
330                                                    )
331                                                    .await
332                                                    .unwrap();
333                                                if sent.len() != 1 {
334                                                    context.sleep(Duration::from_millis(100)).await;
335                                                    continue;
336                                                }
337                                                assert_eq!(&sent[0], pub_key);
338                                                break;
339                                            }
340                                        }
341                                    }
342                                    Mode::Some | Mode::All => {
343                                        // Loop until all peer sends successful
344                                        loop {
345                                            let mut sent = sender
346                                                .send(
347                                                    match mode {
348                                                        Mode::Some => {
349                                                            Recipients::Some(recipients.clone())
350                                                        }
351                                                        Mode::All => Recipients::All,
352                                                        _ => unreachable!(),
353                                                    },
354                                                    public_key.as_ref().to_vec(),
355                                                    true,
356                                                )
357                                                .await
358                                                .unwrap();
359                                            if sent.len() != recipients.len() {
360                                                context.sleep(Duration::from_millis(100)).await;
361                                                continue;
362                                            }
363
364                                            // Compare to expected
365                                            sent.sort();
366                                            assert_eq!(sent, recipients);
367                                            break;
368                                        }
369                                    }
370                                };
371
372                                // Sleep to avoid busy loop
373                                context.sleep(Duration::from_secs(10)).await;
374                            }
375                        });
376
377                    // Neither task should exit
378                    select! {
379                        receiver = receiver => {
380                            panic!("receiver exited: {receiver:?}");
381                        },
382                        sender = sender => {
383                            panic!("sender exited: {sender:?}");
384                        },
385                    }
386                }
387            });
388        }
389
390        // Wait for all peers to finish
391        for _ in 0..n {
392            complete_receiver.recv().await.unwrap();
393        }
394
395        // Ensure no message rate limiting occurred
396        assert_no_rate_limiting(&context);
397    }
398
399    fn run_deterministic_test(seed: u64, mode: Mode) {
400        // Configure test
401        const NUM_PEERS: usize = 25;
402        const BASE_PORT: u16 = 3000;
403
404        // Run first instance
405        let executor = deterministic::Runner::seeded(seed);
406        let state = executor.start(|context| async move {
407            run_network(
408                context.clone(),
409                MAX_MESSAGE_SIZE,
410                BASE_PORT,
411                NUM_PEERS,
412                mode,
413            )
414            .await;
415            context.auditor().state()
416        });
417
418        // Compare result to second instance
419        let executor = deterministic::Runner::seeded(seed);
420        let state2 = executor.start(|context| async move {
421            run_network(
422                context.clone(),
423                MAX_MESSAGE_SIZE,
424                BASE_PORT,
425                NUM_PEERS,
426                mode,
427            )
428            .await;
429            context.auditor().state()
430        });
431        assert_eq!(state, state2);
432    }
433
434    #[test_group("slow")]
435    #[test_traced]
436    fn test_determinism_one() {
437        for i in 0..10 {
438            run_deterministic_test(i, Mode::One);
439        }
440    }
441
442    #[test_group("slow")]
443    #[test_traced]
444    fn test_determinism_some() {
445        for i in 0..10 {
446            run_deterministic_test(i, Mode::Some);
447        }
448    }
449
450    #[test_group("slow")]
451    #[test_traced]
452    fn test_determinism_all() {
453        for i in 0..10 {
454            run_deterministic_test(i, Mode::All);
455        }
456    }
457
458    #[test_traced]
459    fn test_tokio_connectivity() {
460        let executor = tokio::Runner::default();
461        executor.start(|context| async move {
462            let base_port = 4000;
463            let n = 10;
464            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
465        });
466    }
467
468    #[test_traced]
469    fn test_multi_index_oracle() {
470        // Configure test
471        let base_port = 3000;
472        let n: usize = 10;
473
474        // Initialize context
475        let executor = deterministic::Runner::default();
476        executor.start(|context| async move {
477            // Create peers
478            let mut peers_and_sks = Vec::new();
479            for i in 0..n {
480                let sk = ed25519::PrivateKey::from_seed(i as u64);
481                let pk = sk.public_key();
482                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
483                peers_and_sks.push((sk, pk, addr));
484            }
485            let peers = peers_and_sks
486                .iter()
487                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
488                .collect::<Vec<_>>();
489
490            // Create networks
491            let mut waiters = Vec::new();
492            for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
493                // Create peer context
494                let context = context.with_label(&format!("peer_{i}"));
495
496                // Create network
497                let config = Config::test(
498                    peer_sk.clone(),
499                    *peer_addr,
500                    1_024 * 1_024, // 1MB
501                );
502                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
503
504                // Register peers at separate indices
505                oracle
506                    .track(0, [peers[0].clone()].try_into().unwrap())
507                    .await;
508                oracle
509                    .track(1, [peers[1].clone(), peers[2].clone()].try_into().unwrap())
510                    .await;
511                oracle
512                    .track(2, peers.iter().skip(2).cloned().try_collect().unwrap())
513                    .await;
514
515                // Register basic application
516                let (mut sender, mut receiver) =
517                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
518
519                // Wait to connect to all peers, and then send messages to everyone
520                network.start();
521
522                // Send/Receive messages
523                let msg = peer_pk.clone();
524                let handler = context
525                    .with_label("agent")
526                    .spawn(move |context| async move {
527                        if i == 0 {
528                            // Loop until success
529                            loop {
530                                if sender
531                                    .send(Recipients::All, msg.as_ref().to_vec(), true)
532                                    .await
533                                    .unwrap()
534                                    .len()
535                                    == n - 1
536                                {
537                                    break;
538                                }
539
540                                // Sleep and try again (avoid busy loop)
541                                context.sleep(Duration::from_millis(100)).await;
542                            }
543                        } else {
544                            // Ensure message equals sender identity
545                            let (sender, message) = receiver.recv().await.unwrap();
546                            assert_eq!(message, sender.as_ref());
547                        }
548                    });
549
550                // Add to waiters
551                waiters.push(handler);
552            }
553
554            // Wait for waiters to finish (receiver before sender)
555            for waiter in waiters.into_iter().rev() {
556                waiter.await.unwrap();
557            }
558
559            // Ensure no message rate limiting occurred
560            assert_no_rate_limiting(&context);
561        });
562    }
563
564    #[test_traced]
565    fn test_message_too_large() {
566        // Configure test
567        let base_port = 3000;
568        let n: usize = 2;
569
570        // Initialize context
571        let executor = deterministic::Runner::seeded(0);
572        executor.start(|mut context| async move {
573            // Create peers
574            let mut peers_and_sks = Vec::new();
575            for i in 0..n {
576                let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
577                let peer_pk = peer_sk.public_key();
578                let peer_addr =
579                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
580                peers_and_sks.push((peer_sk, peer_pk, peer_addr));
581            }
582            let peers: Map<_, _> = peers_and_sks
583                .iter()
584                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
585                .try_collect()
586                .unwrap();
587
588            // Create network
589            let (sk, _, addr) = peers_and_sks[0].clone();
590            let config = Config::test(
591                sk,
592                addr,
593                1_024 * 1_024, // 1MB
594            );
595            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
596
597            // Register peers
598            oracle.track(0, peers.clone()).await;
599
600            // Register basic application
601            let (mut sender, _) =
602                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
603
604            // Wait to connect to all peers, and then send messages to everyone
605            network.start();
606
607            // Crate random message
608            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
609            context.fill_bytes(&mut msg[..]);
610
611            // Send message
612            let recipient = Recipients::One(peers[1].clone());
613            let result = sender.send(recipient, msg, true).await;
614            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
615        });
616    }
617
618    #[test_traced]
619    fn test_rate_limiting() {
620        // Configure test
621        let base_port = 3000;
622        let n: usize = 2;
623
624        // Initialize context
625        let executor = deterministic::Runner::seeded(0);
626        executor.start(|context| async move {
627            // Create peers
628            let mut peers_and_sks = Vec::new();
629            for i in 0..n {
630                let sk = ed25519::PrivateKey::from_seed(i as u64);
631                let pk = sk.public_key();
632                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
633                peers_and_sks.push((sk, pk, addr));
634            }
635            let peers: Map<_, _> = peers_and_sks
636                .iter()
637                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
638                .try_collect()
639                .unwrap();
640            let (sk0, _, addr0) = peers_and_sks[0].clone();
641            let (sk1, pk1, addr1) = peers_and_sks[1].clone();
642
643            // Create network for peer 0
644            let config0 = Config::test(sk0, addr0, 1_024 * 1_024); // 1MB
645            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
646            oracle0.track(0, peers.clone()).await;
647            let (mut sender0, _receiver0) =
648                network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
649            network0.start();
650
651            // Create network for peer 1
652            let config1 = Config::test(sk1, addr1, 1_024 * 1_024); // 1MB
653            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
654            oracle1.track(0, peers.clone()).await;
655            let (_sender1, _receiver1) =
656                network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
657            network1.start();
658
659            // Send first message, which should be allowed and consume the quota.
660            let msg = vec![0u8; 1024]; // 1KB
661            loop {
662                // Confirm message is sent to peer
663                let sent = sender0
664                    .send(Recipients::One(pk1.clone()), msg.clone(), true)
665                    .await
666                    .unwrap();
667                if !sent.is_empty() {
668                    break;
669                }
670
671                // Ensure we don't rate limit outbound sends while
672                // waiting for peers to connect
673                context.sleep(Duration::from_mins(1)).await
674            }
675
676            // Immediately send the second message to trigger the rate limit.
677            // With partial sends, rate-limited recipients return empty vec (not error).
678            // Outbound rate limiting skips the peer, returns empty vec.
679            let sent = sender0.send(Recipients::One(pk1), msg, true).await.unwrap();
680            assert!(sent.is_empty());
681
682            // Give the metrics time to reflect the rate-limited message.
683            for _ in 0..10 {
684                assert_no_rate_limiting(&context);
685                context.sleep(Duration::from_millis(100)).await;
686            }
687        });
688    }
689
690    #[test_traced]
691    fn test_unordered_peer_sets() {
692        let (n, base_port) = (10, 3000);
693        let executor = deterministic::Runner::default();
694        executor.start(|context| async move {
695            // Create peers
696            let mut peers_and_sks = Vec::new();
697            for i in 0..n {
698                let sk = ed25519::PrivateKey::from_seed(i as u64);
699                let pk = sk.public_key();
700                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
701                peers_and_sks.push((sk, pk, addr));
702            }
703            let peer0 = peers_and_sks[0].clone();
704            let config = Config::test(peer0.0, peer0.2, 1_024 * 1_024);
705            let (network, mut oracle) = Network::new(context.with_label("network"), config);
706            network.start();
707
708            // Subscribe to peer sets
709            let mut subscription = oracle.subscribe().await;
710
711            // Register initial peer set
712            let set10: Map<_, _> = peers_and_sks
713                .iter()
714                .take(2)
715                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
716                .try_collect()
717                .unwrap();
718            oracle.track(10, set10.clone()).await;
719            let (id, new, all) = subscription.recv().await.unwrap();
720            assert_eq!(id, 10);
721            assert_eq!(&new, set10.keys());
722            assert_eq!(&all, set10.keys());
723
724            // Register old peer sets (ignored)
725            let set9: Map<_, _> = peers_and_sks
726                .iter()
727                .skip(2)
728                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
729                .try_collect()
730                .unwrap();
731            oracle.track(9, set9.clone()).await;
732
733            // Add new peer set
734            let set11: Map<_, _> = peers_and_sks
735                .iter()
736                .skip(4)
737                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
738                .try_collect()
739                .unwrap();
740            oracle.track(11, set11.clone()).await;
741            let (id, new, all) = subscription.recv().await.unwrap();
742            assert_eq!(id, 11);
743            assert_eq!(&new, set11.keys());
744            let all_keys: Set<_> = set10
745                .into_keys()
746                .into_iter()
747                .chain(set11.into_keys().into_iter())
748                .try_collect()
749                .unwrap();
750            assert_eq!(all, all_keys);
751        });
752    }
753
754    #[test_traced]
755    fn test_graceful_shutdown() {
756        let base_port = 3000;
757        let n: usize = 5;
758
759        let executor = deterministic::Runner::default();
760        executor.start(|context| async move {
761            // Create peers
762            let mut peers_and_sks = Vec::new();
763            for i in 0..n {
764                let sk = ed25519::PrivateKey::from_seed(i as u64);
765                let pk = sk.public_key();
766                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
767                peers_and_sks.push((sk, pk, addr));
768            }
769            let peers: Map<_, _> = peers_and_sks
770                .iter()
771                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
772                .try_collect()
773                .unwrap();
774
775            // Create networks for all peers
776            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
777            for (i, (sk, pk, addr)) in peers_and_sks.iter().enumerate() {
778                let peer_context = context.with_label(&format!("peer_{i}"));
779                let config = Config::test(sk.clone(), *addr, 1_024 * 1_024);
780                let (mut network, mut oracle) =
781                    Network::new(peer_context.with_label("network"), config);
782
783                // Register peer set
784                oracle.track(0, peers.clone()).await;
785
786                let (mut sender, mut receiver) =
787                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
788                network.start();
789
790                peer_context.with_label("agent").spawn({
791                    let complete_sender = complete_sender.clone();
792                    let pk = pk.clone();
793                    move |context| async move {
794                        // Wait to connect to at least one other peer
795                        let expected_connections = if i == 0 { n - 1 } else { 1 };
796
797                        // Send a message
798                        loop {
799                            let sent = sender
800                                .send(Recipients::All, pk.as_ref().to_vec(), true)
801                                .await
802                                .unwrap();
803                            if sent.len() >= expected_connections {
804                                break;
805                            }
806                            context.sleep(Duration::from_millis(100)).await;
807                        }
808
809                        // Signal that this peer is connected
810                        complete_sender.send(()).await.unwrap();
811
812                        // Keep receiving messages until shutdown
813                        loop {
814                            select! {
815                                result = receiver.recv() => {
816                                    if result.is_err() {
817                                        // Channel closed due to shutdown
818                                        break;
819                                    }
820                                },
821                                _ = context.stopped() => {
822                                    // Graceful shutdown signal received
823                                    break;
824                                },
825                            }
826                        }
827                    }
828                });
829            }
830
831            // Wait for all peers to establish connectivity
832            for _ in 0..n {
833                complete_receiver.recv().await.unwrap();
834            }
835
836            // Verify that network actors started for all peers
837            let metrics_before = context.encode();
838            let is_running = |name: &str| -> bool {
839                metrics_before.lines().any(|line| {
840                    line.starts_with("runtime_tasks_running{")
841                        && line.contains(&format!("name=\"{name}\""))
842                        && line.contains("kind=\"Task\"")
843                        && line.trim_end().ends_with(" 1")
844                })
845            };
846            for i in 0..n {
847                let prefix = format!("peer_{i}_network");
848                assert!(
849                    is_running(&format!("{prefix}_tracker")),
850                    "peer_{i} tracker should be running"
851                );
852                assert!(
853                    is_running(&format!("{prefix}_router")),
854                    "peer_{i} router should be running"
855                );
856                assert!(
857                    is_running(&format!("{prefix}_spawner")),
858                    "peer_{i} spawner should be running"
859                );
860                assert!(
861                    is_running(&format!("{prefix}_listener")),
862                    "peer_{i} listener should be running"
863                );
864                assert!(
865                    is_running(&format!("{prefix}_dialer")),
866                    "peer_{i} dialer should be running"
867                );
868            }
869
870            // All peers are connected - now trigger graceful shutdown
871            let shutdown_context = context.clone();
872            context.with_label("shutdown").spawn(move |_| async move {
873                // Trigger graceful shutdown
874                let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
875
876                // Shutdown should complete successfully without timeout
877                assert!(
878                    result.is_ok(),
879                    "graceful shutdown should complete: {result:?}"
880                );
881            });
882
883            // Wait for shutdown to complete
884            context.stopped().await.unwrap();
885
886            // Give the runtime a tick to process task completions and update metrics
887            context.sleep(Duration::from_millis(100)).await;
888
889            // Verify that all network actors stopped
890            let metrics_after = context.encode();
891            let is_stopped = |name: &str| -> bool {
892                metrics_after.lines().any(|line| {
893                    line.starts_with("runtime_tasks_running{")
894                        && line.contains(&format!("name=\"{name}\""))
895                        && line.contains("kind=\"Task\"")
896                        && line.trim_end().ends_with(" 0")
897                })
898            };
899            for i in 0..n {
900                let prefix = format!("peer_{i}_network");
901                assert!(
902                    is_stopped(&format!("{prefix}_tracker")),
903                    "peer_{i} tracker should be stopped"
904                );
905                assert!(
906                    is_stopped(&format!("{prefix}_router")),
907                    "peer_{i} router should be stopped"
908                );
909                assert!(
910                    is_stopped(&format!("{prefix}_spawner")),
911                    "peer_{i} spawner should be stopped"
912                );
913                assert!(
914                    is_stopped(&format!("{prefix}_listener")),
915                    "peer_{i} listener should be stopped"
916                );
917                assert!(
918                    is_stopped(&format!("{prefix}_dialer")),
919                    "peer_{i} dialer should be stopped"
920                );
921            }
922        });
923    }
924
925    #[test_traced]
926    fn test_subscription_includes_self_when_registered() {
927        let base_port = 3000;
928        let executor = deterministic::Runner::default();
929        executor.start(|context| async move {
930            // Create self (peer0) and other peers
931            let self_sk = ed25519::PrivateKey::from_seed(0);
932            let self_pk = self_sk.public_key();
933            let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
934
935            let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
936            let other_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
937
938            // Create network for peer0 (self)
939            let config = Config::test(self_sk, self_addr, 1_024 * 1_024);
940            let (network, mut oracle) = Network::new(context.with_label("network"), config);
941            network.start();
942
943            // Subscribe to peer sets
944            let mut subscription = oracle.subscribe().await;
945
946            // Register a peer set that does NOT include self
947            let peer_set: Map<_, _> = [(other_pk.clone(), other_addr.into())].try_into().unwrap();
948            oracle.track(1, peer_set.clone()).await;
949
950            // Receive subscription notification
951            let (id, new, all) = subscription.recv().await.unwrap();
952            assert_eq!(id, 1);
953            assert_eq!(new.len(), 1);
954            assert_eq!(all.len(), 1);
955
956            // Self should NOT be in the new set
957            assert!(
958                new.position(&self_pk).is_none(),
959                "new set should not include self"
960            );
961            assert!(
962                new.position(&other_pk).is_some(),
963                "new set should include other"
964            );
965
966            // Self should NOT be in the tracked set (not registered)
967            assert!(
968                all.position(&self_pk).is_none(),
969                "tracked peers should not include self"
970            );
971            assert!(
972                all.position(&other_pk).is_some(),
973                "tracked peers should include other"
974            );
975
976            // Now register a peer set that DOES include self
977            let peer_set: Map<_, _> = [
978                (self_pk.clone(), self_addr.into()),
979                (other_pk.clone(), other_addr.into()),
980            ]
981            .try_into()
982            .unwrap();
983            oracle.track(2, peer_set.clone()).await;
984
985            // Receive subscription notification
986            let (id, new, all) = subscription.recv().await.unwrap();
987            assert_eq!(id, 2);
988            assert_eq!(new.len(), 2);
989            assert_eq!(all.len(), 2);
990
991            // Both peers should be in the new set
992            assert!(
993                new.position(&self_pk).is_some(),
994                "new set should include self"
995            );
996            assert!(
997                new.position(&other_pk).is_some(),
998                "new set should include other"
999            );
1000
1001            // Both peers should be in the tracked set
1002            assert!(
1003                all.position(&self_pk).is_some(),
1004                "tracked peers should include self"
1005            );
1006            assert!(
1007                all.position(&other_pk).is_some(),
1008                "tracked peers should include other"
1009            );
1010        });
1011    }
1012
1013    #[test_traced]
1014    fn test_dns_peer_addresses() {
1015        let base_port = 3200;
1016        let n: usize = 3;
1017
1018        let executor = deterministic::Runner::default();
1019        executor.start(|context| async move {
1020            // Create peers
1021            let mut peers_and_sks = Vec::new();
1022            for i in 0..n {
1023                let private_key = ed25519::PrivateKey::from_seed(i as u64);
1024                let public_key = private_key.public_key();
1025                let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1026                let host_str = format!("peer-{i}.local");
1027                let host = Hostname::new(&host_str).unwrap();
1028                peers_and_sks.push((private_key, public_key, socket, host_str, host));
1029            }
1030
1031            // Register DNS mappings for all peers
1032            for (_, _, socket, host_str, _) in &peers_and_sks {
1033                context.resolver_register(host_str.clone(), Some(vec![socket.ip()]));
1034            }
1035
1036            // Create peer addresses with DNS ingress
1037            let peers: Vec<(_, Address)> = peers_and_sks
1038                .iter()
1039                .map(|(_, pk, socket, _, host)| {
1040                    (
1041                        pk.clone(),
1042                        Address::Asymmetric {
1043                            ingress: Ingress::Dns {
1044                                host: host.clone(),
1045                                port: socket.port(),
1046                            },
1047                            egress: *socket,
1048                        },
1049                    )
1050                })
1051                .collect();
1052
1053            // Create networks
1054            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1055            for (i, (private_key, public_key, socket, _, _)) in peers_and_sks.iter().enumerate() {
1056                let context = context.with_label(&format!("peer_{i}"));
1057
1058                // Create network
1059                let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1060                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1061
1062                // Register peers with DNS addresses
1063                oracle.track(0, peers.clone().try_into().unwrap()).await;
1064
1065                // Register channel
1066                let (mut sender, mut receiver) =
1067                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1068
1069                network.start();
1070
1071                // Send/Receive messages
1072                let pk = public_key.clone();
1073                context.with_label("agent").spawn({
1074                    let complete_sender = complete_sender.clone();
1075                    let peers = peers.clone();
1076                    move |context| async move {
1077                        // Wait for messages from other peers
1078                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1079                            let mut received = HashSet::new();
1080                            while received.len() < n - 1 {
1081                                let (sender, message) = receiver.recv().await.unwrap();
1082                                assert_eq!(message, sender.as_ref());
1083                                received.insert(sender);
1084                            }
1085                            complete_sender.send(()).await.unwrap();
1086
1087                            loop {
1088                                receiver.recv().await.unwrap();
1089                            }
1090                        });
1091
1092                        // Send identity to all peers
1093                        let sender_task =
1094                            context
1095                                .with_label("sender")
1096                                .spawn(move |context| async move {
1097                                    loop {
1098                                        let mut recipients: Vec<_> = peers
1099                                            .iter()
1100                                            .filter(|(p, _)| p != &pk)
1101                                            .map(|(p, _)| p.clone())
1102                                            .collect();
1103                                        recipients.sort();
1104
1105                                        loop {
1106                                            let mut sent = sender
1107                                                .send(Recipients::All, pk.as_ref().to_vec(), true)
1108                                                .await
1109                                                .unwrap();
1110                                            if sent.len() != n - 1 {
1111                                                context.sleep(Duration::from_millis(100)).await;
1112                                                continue;
1113                                            }
1114                                            sent.sort();
1115                                            assert_eq!(sent, recipients);
1116                                            break;
1117                                        }
1118
1119                                        context.sleep(Duration::from_secs(10)).await;
1120                                    }
1121                                });
1122
1123                        select! {
1124                            receiver = receiver => {
1125                                panic!("receiver exited: {receiver:?}")
1126                            },
1127                            sender = sender_task => {
1128                                panic!("sender exited: {sender:?}")
1129                            },
1130                        }
1131                    }
1132                });
1133            }
1134
1135            // Wait for all peers to exchange messages
1136            for _ in 0..n {
1137                complete_receiver.recv().await.unwrap();
1138            }
1139
1140            assert_no_rate_limiting(&context);
1141        });
1142    }
1143
1144    #[test_traced]
1145    fn test_mixed_socket_and_dns_addresses() {
1146        let base_port = 3300;
1147        let n: usize = 4;
1148
1149        let executor = deterministic::Runner::default();
1150        executor.start(|context| async move {
1151            // Create peers
1152            let mut peers_and_sks = Vec::new();
1153            for i in 0..n {
1154                let private_key = ed25519::PrivateKey::from_seed(i as u64);
1155                let public_key = private_key.public_key();
1156                let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1157                peers_and_sks.push((private_key, public_key, socket));
1158            }
1159
1160            // Register DNS mappings for peers 2 and 3 only
1161            for (i, (_, _, socket)) in peers_and_sks.iter().enumerate().skip(2) {
1162                context.resolver_register(format!("peer-{i}.local"), Some(vec![socket.ip()]));
1163            }
1164
1165            // Create peer addresses - peers 0,1 use Symmetric, peers 2,3 use DNS Asymmetric
1166            let peers: Vec<(_, Address)> = peers_and_sks
1167                .iter()
1168                .enumerate()
1169                .map(|(i, (_, pk, socket))| {
1170                    let addr = if i < 2 {
1171                        // Peers 0, 1: Symmetric socket address
1172                        Address::Symmetric(*socket)
1173                    } else {
1174                        // Peers 2, 3: Asymmetric with DNS ingress
1175                        Address::Asymmetric {
1176                            ingress: Ingress::Dns {
1177                                host: hostname!(&format!("peer-{i}.local")),
1178                                port: socket.port(),
1179                            },
1180                            egress: *socket,
1181                        }
1182                    };
1183                    (pk.clone(), addr)
1184                })
1185                .collect();
1186
1187            // Create networks
1188            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1189            for (i, (private_key, public_key, socket)) in peers_and_sks.iter().enumerate() {
1190                let context = context.with_label(&format!("peer_{i}"));
1191
1192                // Create network
1193                let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1194                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1195
1196                // Register peers with mixed addresses
1197                oracle.track(0, peers.clone().try_into().unwrap()).await;
1198
1199                // Register channel
1200                let (mut sender, mut receiver) =
1201                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1202
1203                network.start();
1204
1205                // Send/Receive messages
1206                let pk = public_key.clone();
1207                context.with_label("agent").spawn({
1208                    let complete_sender = complete_sender.clone();
1209                    let peers = peers.clone();
1210                    move |context| async move {
1211                        // Wait for messages from other peers
1212                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1213                            let mut received = HashSet::new();
1214                            while received.len() < n - 1 {
1215                                let (sender, message) = receiver.recv().await.unwrap();
1216                                assert_eq!(message, sender.as_ref());
1217                                received.insert(sender);
1218                            }
1219                            complete_sender.send(()).await.unwrap();
1220
1221                            loop {
1222                                receiver.recv().await.unwrap();
1223                            }
1224                        });
1225
1226                        // Send identity to all peers
1227                        let sender_task =
1228                            context
1229                                .with_label("sender")
1230                                .spawn(move |context| async move {
1231                                    loop {
1232                                        let mut recipients: Vec<_> = peers
1233                                            .iter()
1234                                            .filter(|(p, _)| p != &pk)
1235                                            .map(|(p, _)| p.clone())
1236                                            .collect();
1237                                        recipients.sort();
1238
1239                                        loop {
1240                                            let mut sent = sender
1241                                                .send(Recipients::All, pk.as_ref().to_vec(), true)
1242                                                .await
1243                                                .unwrap();
1244                                            if sent.len() != n - 1 {
1245                                                context.sleep(Duration::from_millis(100)).await;
1246                                                continue;
1247                                            }
1248                                            sent.sort();
1249                                            assert_eq!(sent, recipients);
1250                                            break;
1251                                        }
1252
1253                                        context.sleep(Duration::from_secs(10)).await;
1254                                    }
1255                                });
1256
1257                        select! {
1258                            receiver = receiver => {
1259                                panic!("receiver exited: {receiver:?}")
1260                            },
1261                            sender = sender_task => {
1262                                panic!("sender exited: {sender:?}")
1263                            },
1264                        }
1265                    }
1266                });
1267            }
1268
1269            // Wait for all peers to exchange messages
1270            for _ in 0..n {
1271                complete_receiver.recv().await.unwrap();
1272            }
1273
1274            assert_no_rate_limiting(&context);
1275        });
1276    }
1277
1278    #[test_traced]
1279    fn test_dns_resolving_to_private_ip_not_dialed() {
1280        // Test that when allow_private_ips=false, DNS addresses that resolve
1281        // to private IPs are not dialed.
1282        let base_port = 4400;
1283        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1284        executor.start(|context| async move {
1285            let peer0 = ed25519::PrivateKey::from_seed(0);
1286            let peer1 = ed25519::PrivateKey::from_seed(1);
1287
1288            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1289            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1290
1291            // Register DNS mapping that resolves to localhost (private IP)
1292            context.resolver_register("peer-0.local".to_string(), Some(vec![socket0.ip()]));
1293
1294            // Create peer 0 with allow_private_ips=true
1295            let mut config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1296            config0.allow_private_ips = true;
1297            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1298
1299            // Peer 0 knows about peer 1 with a socket address
1300            let peers0: Vec<(_, Address)> = vec![
1301                (peer0.public_key(), Address::Symmetric(socket0)),
1302                (peer1.public_key(), Address::Symmetric(socket1)),
1303            ];
1304            oracle0.track(0, peers0.try_into().unwrap()).await;
1305
1306            let (_sender0, mut receiver0) =
1307                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1308            network0.start();
1309
1310            // Create peer 1 with allow_private_ips=false
1311            let mut config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1312            config1.allow_private_ips = false; // This should prevent dialing the private IP
1313            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1314
1315            // Peer 1 knows about peer 0 with a DNS address that resolves to private IP
1316            let peers1: Vec<(_, Address)> = vec![
1317                (
1318                    peer0.public_key(),
1319                    Address::Asymmetric {
1320                        ingress: Ingress::Dns {
1321                            host: hostname!("peer-0.local"),
1322                            port: socket0.port(),
1323                        },
1324                        egress: socket0,
1325                    },
1326                ),
1327                (peer1.public_key(), Address::Symmetric(socket1)),
1328            ];
1329            oracle1.track(0, peers1.try_into().unwrap()).await;
1330
1331            let (mut sender1, _receiver1) =
1332                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1333            network1.start();
1334
1335            // Wait for a period during which peer 1 would normally connect
1336            context.sleep(Duration::from_secs(5)).await;
1337
1338            // Try to send from peer 1 - should not reach anyone since private IPs are blocked
1339            let sent = sender1
1340                .send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
1341                .await
1342                .unwrap();
1343            assert!(
1344                sent.is_empty(),
1345                "peer 1 should not have connected to peer 0 (private IP)"
1346            );
1347
1348            // Verify peer 0 received nothing from peer 1
1349            select! {
1350                msg = receiver0.recv() => {
1351                    panic!("peer 0 should not have received any message, got: {msg:?}");
1352                },
1353                _ = context.sleep(Duration::from_secs(1)) => {
1354                    // Expected: timeout with no message
1355                },
1356            }
1357        });
1358    }
1359
1360    #[test_traced]
1361    fn test_dns_mixed_ips_connectivity() {
1362        // Test that peers can connect even when DNS resolves to multiple IPs
1363        // where most are unreachable. The dialer randomly picks an IP, so
1364        // eventually it should pick the reachable one.
1365        //
1366        // Run over 25 seeds to ensure we exercise the random IP selection.
1367        for seed in 0..25 {
1368            let base_port = 3500;
1369
1370            let cfg = deterministic::Config::default()
1371                .with_seed(seed)
1372                .with_timeout(Some(Duration::from_secs(120)));
1373            let executor = deterministic::Runner::new(cfg);
1374            executor.start(|context| async move {
1375                let peer0 = ed25519::PrivateKey::from_seed(0);
1376                let peer1 = ed25519::PrivateKey::from_seed(1);
1377
1378                let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1379                let socket0 = SocketAddr::new(good_ip, base_port);
1380                let socket1 = SocketAddr::new(good_ip, base_port + 1);
1381
1382                // Register DNS mappings with 3 bad IPs and 1 good IP for both peers
1383                let mut all_ips0: Vec<IpAddr> = (1..=3)
1384                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1385                    .collect();
1386                all_ips0.push(good_ip);
1387                context.resolver_register("peer-0.local", Some(all_ips0));
1388
1389                let mut all_ips1: Vec<IpAddr> = (1..=3)
1390                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1391                    .collect();
1392                all_ips1.push(good_ip);
1393                context.resolver_register("peer-1.local", Some(all_ips1));
1394
1395                // Create peer addresses - both peers use DNS with mixed IPs
1396                let peers: Vec<(_, Address)> = vec![
1397                    (
1398                        peer0.public_key(),
1399                        Address::Asymmetric {
1400                            ingress: Ingress::Dns {
1401                                host: hostname!("peer-0.local"),
1402                                port: base_port,
1403                            },
1404                            egress: socket0,
1405                        },
1406                    ),
1407                    (
1408                        peer1.public_key(),
1409                        Address::Asymmetric {
1410                            ingress: Ingress::Dns {
1411                                host: hostname!("peer-1.local"),
1412                                port: base_port + 1,
1413                            },
1414                            egress: socket1,
1415                        },
1416                    ),
1417                ];
1418
1419                // Create peer 0
1420                let config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1421                let (mut network0, mut oracle0) =
1422                    Network::new(context.with_label("peer_0"), config0);
1423                oracle0.track(0, peers.clone().try_into().unwrap()).await;
1424                let (_sender0, mut receiver0) =
1425                    network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1426                network0.start();
1427
1428                // Create peer 1
1429                let config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1430                let (mut network1, mut oracle1) =
1431                    Network::new(context.with_label("peer_1"), config1);
1432                oracle1.track(0, peers.clone().try_into().unwrap()).await;
1433                let (mut sender1, _receiver1) =
1434                    network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1435                network1.start();
1436
1437                // Wait for peers to connect (may take multiple attempts due to random IP selection)
1438                let pk0 = peer0.public_key();
1439                loop {
1440                    let sent = sender1
1441                        .send(
1442                            Recipients::One(pk0.clone()),
1443                            peer1.public_key().as_ref().to_vec(),
1444                            true,
1445                        )
1446                        .await
1447                        .unwrap();
1448                    if !sent.is_empty() {
1449                        break;
1450                    }
1451                    context.sleep(Duration::from_millis(100)).await;
1452                }
1453
1454                // Verify peer 0 received the message
1455                let (sender, msg) = receiver0.recv().await.unwrap();
1456                assert_eq!(sender, peer1.public_key());
1457                assert_eq!(msg, peer1.public_key().as_ref());
1458            });
1459        }
1460    }
1461
1462    #[test_traced]
1463    fn test_many_peer_restart_with_new_address() {
1464        let base_port = 9500;
1465        let n = 5;
1466
1467        let executor = deterministic::Runner::default();
1468        executor.start(|context| async move {
1469            // Create peers
1470            let peers: Vec<_> = (0..n)
1471                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1472                .collect();
1473            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1474
1475            // Track senders/receivers/oracles/handles across restarts
1476            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1477            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1478            let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1479            let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1480                (0..n).map(|_| None).collect();
1481
1482            // Track port allocations (updated on restart)
1483            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1484
1485            // Create initial peer set with addresses
1486            let peer_set: Vec<(_, Address)> = addresses
1487                .iter()
1488                .enumerate()
1489                .map(|(i, pk)| {
1490                    (
1491                        pk.clone(),
1492                        Address::Symmetric(SocketAddr::new(
1493                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1494                            ports[i],
1495                        )),
1496                    )
1497                })
1498                .collect();
1499
1500            // Create networks for all peers
1501            for (i, peer) in peers.iter().enumerate() {
1502                let peer_context = context.with_label(&format!("peer_{i}"));
1503
1504                let config = Config::test(
1505                    peer.clone(),
1506                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1507                    MAX_MESSAGE_SIZE,
1508                );
1509                let (mut network, mut oracle) =
1510                    Network::new(peer_context.with_label("network"), config);
1511
1512                // Register peer set
1513                oracle.track(0, peer_set.clone().try_into().unwrap()).await;
1514
1515                let (sender, receiver) =
1516                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1517                senders[i] = Some(sender);
1518                receivers[i] = Some(receiver);
1519                oracles[i] = Some(oracle);
1520
1521                let handle = network.start();
1522                handles[i] = Some(handle);
1523            }
1524
1525            // Wait for full connectivity (each peer can send to all others)
1526            for (i, sender) in senders.iter_mut().enumerate() {
1527                let sender = sender.as_mut().unwrap();
1528                loop {
1529                    let sent = sender
1530                        .send(
1531                            Recipients::All,
1532                            peers[i].public_key().as_ref().to_vec(),
1533                            true,
1534                        )
1535                        .await
1536                        .unwrap();
1537                    if sent.len() == n - 1 {
1538                        break;
1539                    }
1540                    context.sleep(Duration::from_millis(100)).await;
1541                }
1542            }
1543
1544            // Verify each peer can receive from all others
1545            for receiver in receivers.iter_mut() {
1546                let receiver = receiver.as_mut().unwrap();
1547                let mut received = HashSet::new();
1548                while received.len() < n - 1 {
1549                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1550                    assert_eq!(message, sender.as_ref());
1551                    received.insert(sender);
1552                }
1553            }
1554
1555            // Restart each non-first peer with a new port, multiple rounds
1556            let mut restart_counter = 0u16;
1557            for round in 0..3 {
1558                for restart_peer_idx in 1..n {
1559                    // Allocate a new unique port
1560                    restart_counter += 1;
1561                    let new_port = base_port + 100 + restart_counter;
1562                    ports[restart_peer_idx] = new_port;
1563
1564                    // Abort the peer's network
1565                    if let Some(handle) = handles[restart_peer_idx].take() {
1566                        handle.abort();
1567                    }
1568                    senders[restart_peer_idx] = None;
1569                    receivers[restart_peer_idx] = None;
1570                    oracles[restart_peer_idx] = None;
1571
1572                    // Create updated peer set with new port
1573                    let updated_peer_set: Vec<(_, Address)> = addresses
1574                        .iter()
1575                        .enumerate()
1576                        .map(|(i, pk)| {
1577                            (
1578                                pk.clone(),
1579                                Address::Symmetric(SocketAddr::new(
1580                                    IpAddr::V4(Ipv4Addr::LOCALHOST),
1581                                    ports[i],
1582                                )),
1583                            )
1584                        })
1585                        .collect();
1586
1587                    // Update oracle on all running peers
1588                    for oracle in oracles.iter_mut().flatten() {
1589                        oracle
1590                            .track(
1591                                (round * (n - 1) + restart_peer_idx) as u64,
1592                                updated_peer_set.clone().try_into().unwrap(),
1593                            )
1594                            .await;
1595                    }
1596
1597                    // Restart the peer with new port
1598                    let peer_context =
1599                        context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1600                    let config = Config::test(
1601                        peers[restart_peer_idx].clone(),
1602                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1603                        MAX_MESSAGE_SIZE,
1604                    );
1605                    let (mut network, mut oracle) =
1606                        Network::new(peer_context.with_label("network"), config);
1607
1608                    oracle
1609                        .track(
1610                            (round * (n - 1) + restart_peer_idx) as u64,
1611                            updated_peer_set.clone().try_into().unwrap(),
1612                        )
1613                        .await;
1614
1615                    let (sender, receiver) = network.register(
1616                        0,
1617                        Quota::per_second(NZU32!(100)),
1618                        DEFAULT_MESSAGE_BACKLOG,
1619                    );
1620                    senders[restart_peer_idx] = Some(sender);
1621                    receivers[restart_peer_idx] = Some(receiver);
1622                    oracles[restart_peer_idx] = Some(oracle);
1623
1624                    let handle = network.start();
1625                    handles[restart_peer_idx] = Some(handle);
1626
1627                    // Wait for the restarted peer to reconnect to all others
1628                    let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1629                    loop {
1630                        let sent = restarted_sender
1631                            .send(
1632                                Recipients::All,
1633                                peers[restart_peer_idx].public_key().as_ref().to_vec(),
1634                                true,
1635                            )
1636                            .await
1637                            .unwrap();
1638                        if sent.len() == n - 1 {
1639                            break;
1640                        }
1641                        context.sleep(Duration::from_millis(100)).await;
1642                    }
1643
1644                    // Verify all other peers can send to the restarted peer
1645                    for i in 0..n {
1646                        if i == restart_peer_idx {
1647                            continue;
1648                        }
1649                        let sender = senders[i].as_mut().unwrap();
1650                        loop {
1651                            let sent = sender
1652                                .send(
1653                                    Recipients::One(addresses[restart_peer_idx].clone()),
1654                                    peers[i].public_key().as_ref().to_vec(),
1655                                    true,
1656                                )
1657                                .await
1658                                .unwrap();
1659                            if sent.len() == 1 {
1660                                break;
1661                            }
1662                            context.sleep(Duration::from_millis(100)).await;
1663                        }
1664                    }
1665
1666                    // Verify the restarted peer receives from all others
1667                    let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1668                    let mut received = HashSet::new();
1669                    while received.len() < n - 1 {
1670                        let (sender, message): (ed25519::PublicKey, _) =
1671                            restarted_receiver.recv().await.unwrap();
1672                        assert_eq!(message, sender.as_ref());
1673                        received.insert(sender);
1674                    }
1675                }
1676            }
1677
1678            assert_no_rate_limiting(&context);
1679        });
1680    }
1681
1682    #[test_traced]
1683    fn test_simultaneous_peer_restart() {
1684        let base_port = 9700;
1685        let n = 5;
1686
1687        let executor = deterministic::Runner::default();
1688        executor.start(|context| async move {
1689            // Create peers
1690            let peers: Vec<_> = (0..n)
1691                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1692                .collect();
1693            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1694
1695            // Track port allocations (updated on restart)
1696            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1697
1698            // Create initial peer set with addresses
1699            let peer_set: Vec<(_, Address)> = addresses
1700                .iter()
1701                .enumerate()
1702                .map(|(i, pk)| {
1703                    (
1704                        pk.clone(),
1705                        Address::Symmetric(SocketAddr::new(
1706                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1707                            ports[i],
1708                        )),
1709                    )
1710                })
1711                .collect();
1712
1713            // Track senders/receivers/oracles/handles across restarts
1714            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1715            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1716            let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1717            let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1718                (0..n).map(|_| None).collect();
1719
1720            // Create networks for all peers
1721            for (i, peer) in peers.iter().enumerate() {
1722                let peer_context = context.with_label(&format!("peer_{i}"));
1723
1724                let config = Config::test(
1725                    peer.clone(),
1726                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1727                    MAX_MESSAGE_SIZE,
1728                );
1729                let (mut network, mut oracle) =
1730                    Network::new(peer_context.with_label("network"), config);
1731
1732                // Register peer set
1733                oracle.track(0, peer_set.clone().try_into().unwrap()).await;
1734
1735                let (sender, receiver) =
1736                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1737                senders[i] = Some(sender);
1738                receivers[i] = Some(receiver);
1739                oracles[i] = Some(oracle);
1740
1741                let handle = network.start();
1742                handles[i] = Some(handle);
1743            }
1744
1745            // Wait for full connectivity
1746            for (i, sender) in senders.iter_mut().enumerate() {
1747                let sender = sender.as_mut().unwrap();
1748                loop {
1749                    let sent = sender
1750                        .send(
1751                            Recipients::All,
1752                            peers[i].public_key().as_ref().to_vec(),
1753                            true,
1754                        )
1755                        .await
1756                        .unwrap();
1757                    if sent.len() == n - 1 {
1758                        break;
1759                    }
1760                    context.sleep(Duration::from_millis(100)).await;
1761                }
1762            }
1763
1764            // Verify each peer can receive from all others
1765            for receiver in receivers.iter_mut() {
1766                let receiver = receiver.as_mut().unwrap();
1767                let mut received = HashSet::new();
1768                while received.len() < n - 1 {
1769                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1770                    assert_eq!(message, sender.as_ref());
1771                    received.insert(sender);
1772                }
1773            }
1774
1775            // Shutdown all peers except peer 0 simultaneously.
1776            //
1777            // We keep peer 0 alive to exercise the case where multiple
1778            // peers churn at once.
1779            let restart_peers: Vec<usize> = (1..n).collect();
1780            for &idx in &restart_peers {
1781                if let Some(handle) = handles[idx].take() {
1782                    handle.abort();
1783                }
1784                senders[idx] = None;
1785                receivers[idx] = None;
1786                oracles[idx] = None;
1787                ports[idx] = base_port + 100 + idx as u16;
1788            }
1789
1790            // Wait for connections to be detected as closed
1791            context.sleep(Duration::from_secs(2)).await;
1792
1793            // Create updated peer set with new ports
1794            let updated_peer_set: Vec<(_, Address)> = addresses
1795                .iter()
1796                .enumerate()
1797                .map(|(i, pk)| {
1798                    (
1799                        pk.clone(),
1800                        Address::Symmetric(SocketAddr::new(
1801                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1802                            ports[i],
1803                        )),
1804                    )
1805                })
1806                .collect();
1807
1808            // Update oracle on peer 0 (the only running peer)
1809            oracles[0]
1810                .as_mut()
1811                .unwrap()
1812                .track(1, updated_peer_set.clone().try_into().unwrap())
1813                .await;
1814
1815            // Restart all shutdown peers with new ports
1816            for &idx in &restart_peers {
1817                let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1818                let config = Config::test(
1819                    peers[idx].clone(),
1820                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1821                    MAX_MESSAGE_SIZE,
1822                );
1823                let (mut network, mut oracle) =
1824                    Network::new(peer_context.with_label("network"), config);
1825
1826                oracle
1827                    .track(1, updated_peer_set.clone().try_into().unwrap())
1828                    .await;
1829
1830                let (sender, receiver) =
1831                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1832                senders[idx] = Some(sender);
1833                receivers[idx] = Some(receiver);
1834                oracles[idx] = Some(oracle);
1835
1836                let handle = network.start();
1837                handles[idx] = Some(handle);
1838            }
1839
1840            // Wait for full connectivity after restart
1841            for (i, sender) in senders.iter_mut().enumerate() {
1842                let sender = sender.as_mut().unwrap();
1843                loop {
1844                    let sent = sender
1845                        .send(
1846                            Recipients::All,
1847                            peers[i].public_key().as_ref().to_vec(),
1848                            true,
1849                        )
1850                        .await
1851                        .unwrap();
1852                    if sent.len() == n - 1 {
1853                        break;
1854                    }
1855                    context.sleep(Duration::from_millis(100)).await;
1856                }
1857            }
1858
1859            // Verify all peers can receive from all others after restart
1860            for receiver in receivers.iter_mut() {
1861                let receiver = receiver.as_mut().unwrap();
1862                let mut received = HashSet::new();
1863                while received.len() < n - 1 {
1864                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1865                    assert_eq!(message, sender.as_ref());
1866                    received.insert(sender);
1867                }
1868            }
1869
1870            assert_no_rate_limiting(&context);
1871        });
1872    }
1873    #[test_traced]
1874    fn test_operations_after_shutdown_do_not_panic() {
1875        let executor = deterministic::Runner::default();
1876        executor.start(|context| async move {
1877            let peer = ed25519::PrivateKey::from_seed(0);
1878            let address = peer.public_key();
1879
1880            let peer_context = context.with_label("peer");
1881            let config = Config::test(
1882                peer.clone(),
1883                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1884                MAX_MESSAGE_SIZE,
1885            );
1886            let (mut network, mut oracle) =
1887                Network::new(peer_context.with_label("network"), config);
1888
1889            // Register channel and peer set
1890            let (mut sender, _receiver) =
1891                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1892            let peer_addr =
1893                Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1894            let peers: Map<ed25519::PublicKey, Address> =
1895                vec![(address.clone(), peer_addr)].try_into().unwrap();
1896            oracle.track(0, peers.clone()).await;
1897
1898            // Start and immediately abort the network
1899            let handle = network.start();
1900            handle.abort();
1901
1902            // Wait for shutdown to propagate
1903            context.sleep(Duration::from_millis(100)).await;
1904
1905            // Oracle operations should not panic even after shutdown
1906            oracle.track(1, peers.clone()).await;
1907            let _ = oracle.peer_set(0).await;
1908            let _ = oracle.subscribe().await;
1909            oracle.block(address.clone()).await;
1910
1911            // Sender operations should not panic even after shutdown
1912            let sent = sender
1913                .send(Recipients::All, address.as_ref().to_vec(), true)
1914                .await
1915                .unwrap();
1916            assert!(sent.is_empty());
1917        });
1918    }
1919
1920    fn clean_shutdown(seed: u64) {
1921        let cfg = deterministic::Config::default()
1922            .with_seed(seed)
1923            .with_timeout(Some(Duration::from_secs(30)));
1924        let executor = deterministic::Runner::new(cfg);
1925        executor.start(|context| async move {
1926            let peer = ed25519::PrivateKey::from_seed(0);
1927
1928            let peer_context = context.with_label("peer");
1929            let config = Config::test(
1930                peer.clone(),
1931                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1932                MAX_MESSAGE_SIZE,
1933            );
1934            let (mut network, mut oracle) =
1935                Network::new(peer_context.with_label("network"), config);
1936
1937            // Register channel and peer set
1938            let (_, _) =
1939                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1940            let peer_addr =
1941                Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1942            let peers: Map<ed25519::PublicKey, Address> =
1943                vec![(peer.public_key(), peer_addr)].try_into().unwrap();
1944            oracle.track(0, peers).await;
1945
1946            // Start the network
1947            let handle = network.start();
1948
1949            // Allow tasks to start
1950            context.sleep(Duration::from_millis(100)).await;
1951
1952            // Count running tasks under the network prefix
1953            let running_before = count_running_tasks(&context, "peer_network");
1954            assert!(
1955                running_before > 0,
1956                "at least one network task should be running"
1957            );
1958
1959            // Abort the network
1960            handle.abort();
1961            let _ = handle.await;
1962
1963            // Give the runtime a tick to process aborts
1964            context.sleep(Duration::from_millis(100)).await;
1965
1966            // Verify all network tasks are stopped
1967            let running_after = count_running_tasks(&context, "peer_network");
1968            assert_eq!(
1969                running_after, 0,
1970                "all network tasks should be stopped, but {running_after} still running"
1971            );
1972        });
1973    }
1974
1975    #[test_traced]
1976    fn test_clean_shutdown() {
1977        for seed in 0..25 {
1978            clean_shutdown(seed);
1979        }
1980    }
1981
1982    fn duplicate_addresses_disconnected(seed: u64) {
1983        let base_port = 6000;
1984        let executor = deterministic::Runner::seeded(seed);
1985        executor.start(|context| async move {
1986            let peer0 = ed25519::PrivateKey::from_seed(0);
1987            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1988            let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
1989            let peer1 = ed25519::PrivateKey::from_seed(1);
1990            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1991            let peer2 = ed25519::PrivateKey::from_seed(2);
1992
1993            // Start peer 0
1994            let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
1995            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1996            let (mut sender0, _receiver0) =
1997                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1998            network0.start();
1999
2000            // Create peer set with an incorrect address for peer 2
2001            let peer_set0: Vec<(_, Address)> = vec![
2002                (peer0.public_key(), Address::Symmetric(socket0)),
2003                (peer1.public_key(), Address::Symmetric(socket1)),
2004                (peer2.public_key(), Address::Symmetric(socket1)),
2005            ];
2006            oracle0.track(0, peer_set0.try_into().unwrap()).await;
2007
2008            // Wait for connections to be attempted
2009            context.sleep(Duration::from_secs(30)).await;
2010
2011            // Peer 0 can't send to anyone
2012            let sent = sender0
2013                .send(Recipients::All, peer1.public_key().as_ref(), true)
2014                .await
2015                .unwrap();
2016            assert!(sent.is_empty());
2017
2018            // Start peer 1 (has duplicate but correct address)
2019            let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
2020            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
2021            let (_sender1, mut receiver1) =
2022                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2023            network1.start();
2024
2025            // Create peer set where peer 0 must dial peer 1 for connection
2026            let peer_set1: Vec<(_, Address)> = vec![
2027                (peer0.public_key(), Address::Symmetric(wrong_socket0)),
2028                (peer1.public_key(), Address::Symmetric(socket1)),
2029                (peer2.public_key(), Address::Symmetric(socket1)),
2030            ];
2031            oracle1.track(0, peer_set1.try_into().unwrap()).await;
2032
2033            // Wait for connections to be made
2034            context.sleep(Duration::from_secs(30)).await;
2035
2036            // Now peer 0 should connect to peer 1 at correct address
2037            loop {
2038                let sent = sender0
2039                    .send(Recipients::All, peer0.public_key().as_ref(), true)
2040                    .await
2041                    .unwrap();
2042                if sent.len() == 1 {
2043                    assert_eq!(sent[0], peer1.public_key());
2044                    break;
2045                }
2046                context.sleep(Duration::from_millis(100)).await;
2047            }
2048            let (sender, _) = receiver1.recv().await.unwrap();
2049            assert_eq!(sender, peer0.public_key());
2050        });
2051    }
2052
2053    #[test_traced]
2054    fn test_duplicate_addresses_disconnected() {
2055        // Ensure different dial orders explored by running with different seeds
2056        for seed in 0..25 {
2057            duplicate_addresses_disconnected(seed);
2058        }
2059    }
2060
2061    #[test_traced]
2062    fn test_duplicate_addresses_connected() {
2063        let base_port = 6000;
2064        let executor = deterministic::Runner::default();
2065        executor.start(|context| async move {
2066            let peer0 = ed25519::PrivateKey::from_seed(0);
2067            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2068            let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2069            let peer1 = ed25519::PrivateKey::from_seed(1);
2070            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2071            let peer2 = ed25519::PrivateKey::from_seed(2);
2072            let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
2073
2074            // Start peer 0
2075            let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
2076            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
2077            let (mut sender0, mut receiver0) =
2078                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2079            network0.start();
2080
2081            // Start peer 2
2082            let config2 = Config::test(peer2.clone(), socket2, MAX_MESSAGE_SIZE);
2083            let (mut network2, mut oracle2) = Network::new(context.with_label("peer_2"), config2);
2084            let (_sender2, mut receiver2) =
2085                network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2086            network2.start();
2087
2088            // Create peer set with an incorrect address for peer 2
2089            let peer_set: Vec<(_, Address)> = vec![
2090                (peer0.public_key(), Address::Symmetric(socket0)),
2091                (peer1.public_key(), Address::Symmetric(socket1)),
2092                (peer2.public_key(), Address::Symmetric(socket1)),
2093            ];
2094            oracle0.track(0, peer_set.clone().try_into().unwrap()).await;
2095            oracle2.track(0, peer_set.try_into().unwrap()).await;
2096
2097            // Wait for connections to be made
2098            context.sleep(Duration::from_secs(30)).await;
2099
2100            // Peer 0 can send to peer 2
2101            loop {
2102                let sent = sender0
2103                    .send(Recipients::All, peer2.public_key().as_ref(), true)
2104                    .await
2105                    .unwrap();
2106                if sent.len() == 1 {
2107                    assert_eq!(sent[0], peer2.public_key());
2108                    break;
2109                }
2110                context.sleep(Duration::from_millis(100)).await;
2111            }
2112            let (sender, _) = receiver2.recv().await.unwrap();
2113            assert_eq!(sender, peer0.public_key());
2114
2115            // Start peer 1 (has duplicate but correct address)
2116            let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
2117            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
2118            let (mut sender1, _receiver1) =
2119                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2120            network1.start();
2121
2122            // Create peer set where peer 0 must dial peer 1 for connection
2123            let peer_set1: Vec<(_, Address)> = vec![
2124                (peer0.public_key(), Address::Symmetric(wrong_socket0)),
2125                (peer1.public_key(), Address::Symmetric(socket1)),
2126                (peer2.public_key(), Address::Symmetric(socket1)),
2127            ];
2128            oracle1.track(0, peer_set1.try_into().unwrap()).await;
2129
2130            // Wait for connections to be made
2131            context.sleep(Duration::from_secs(30)).await;
2132
2133            // Now peer 0 should connect to peer 1 at correct address and peer 2 should dial peer 1
2134            loop {
2135                let sent = sender1
2136                    .send(Recipients::All, peer1.public_key().as_ref(), true)
2137                    .await
2138                    .unwrap();
2139                if sent.len() == 2 {
2140                    assert!(sent.contains(&peer0.public_key()));
2141                    assert!(sent.contains(&peer2.public_key()));
2142                    break;
2143                }
2144                context.sleep(Duration::from_millis(100)).await;
2145            }
2146            let mut received0 = false;
2147            while let Ok((sender, _)) = receiver0.recv().await {
2148                // May have some items around from the initial send
2149                if sender == peer1.public_key() {
2150                    received0 = true;
2151                    break;
2152                }
2153            }
2154            assert!(received0);
2155            let (sender, _) = receiver2.recv().await.unwrap();
2156            assert_eq!(sender, peer1.public_key());
2157        });
2158    }
2159}