Skip to main content

commonware_p2p/authenticated/lookup/
mod.rs

1//! Communicate with a fixed set of authenticated peers with known addresses over encrypted connections.
2//!
3//! `lookup` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Unlike `discovery`, peers in `lookup` don't use a discovery mechanism to find each other;
6//! each peer's address is supplied by the application layer.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
12//!
13//! # Design
14//!
15//! ## Discovery
16//!
17//! This module operates under the assumption that all peers are aware of and synchronized on
18//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
19//! list of peer `PublicKey`/`SocketAddr` pairs (`(u64, Vec<(PublicKey, SocketAddr)>)`).
20//!
21//! On startup, the application supplies the initial set of peers. The [`Oracle`] implements
22//! [`AddressableManager`](crate::AddressableManager) which provides two ways to update peer addresses:
23//!
24//! - [`AddressableManager::track`](crate::AddressableManager::track): Track a new peer set at a
25//!   monotonically increasing index. Use this when the peer set composition changes (peers added/removed).
26//!   This accepts either a list of primary peers or an
27//!   [`AddressableTrackedPeers`](crate::AddressableTrackedPeers) value containing both primary and
28//!   secondary peers.
29//! - [`AddressableManager::overwrite`](crate::AddressableManager::overwrite): Update multiple
30//!   peers' addresses in-place without creating a new peer set. Use this when only peer IPs change but
31//!   the peer set composition stays the same. Peers not in the directory (or unchanged) are silently skipped (so the application doesn't
32//!   need to remember what their last submitted peer set was).
33//!
34//! Secondary peers remain visible in [`PeerSetUpdate`](crate::PeerSetUpdate)
35//! notifications, are accepted for inbound connections, and may receive
36//! `Recipients::All` traffic on established connections, but outbound dialing
37//! is restricted to primary peers.
38//!
39//! Any inbound connection attempts from an IP address that is not in the union of all registered
40//! primary or secondary peers will be rejected.
41//!
42//! ## Messages
43//!
44//! Application-level data is exchanged using the `Data` message type. This structure contains:
45//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
46//! - `message`: The arbitrary application payload as `IoBuf`.
47//!
48//! The size of the `message` bytes must not exceed the configured
49//! `max_message_size`. If it does, the sending operation will panic. Messages can be sent with `priority`, allowing certain
50//! communications to potentially bypass lower-priority messages waiting in send queues across all
51//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
52//! and rate limiting.
53//!
54//! ## Compression
55//!
56//! Stream compression is not provided at the transport layer to avoid inadvertently
57//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
58//! between compression and encryption by analyzing patterns in the resulting data.
59//! By compressing secrets alongside attacker-controlled content, these attacks can infer
60//! sensitive information through compression ratio analysis. Applications that choose
61//! to compress data should do so with full awareness of these risks and implement
62//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
63//! alongside sensitive information).
64//!
65//! ## Batching
66//!
67//! Applications seeking higher performance should prefer batching messages
68//! above `p2p`. Larger application-level batches amortize per-message
69//! encryption overhead and, if the application also compresses its payloads,
70//! can improve compression ratio.
71//!
72//! ## Rate Limiting
73//!
74//! There are five primary rate limits:
75//!
76//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
77//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
78//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
79//! - `peer_connection_cooldown`: The per-peer rate limit for inbound and outbound connection reservations, expressed as a minimum cooldown between attempts.
80//! - `rate` (per channel): The rate limit for messages sent on a single channel.
81//!
82//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
83//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
84//!
85//! ## IP Poisoning
86//!
87//! A malicious peer can claim an ingress [std::net::SocketAddr] that collides with an honest
88//! peer, drawing invalid dial attempts to
89//! the honest peer (where we expect the malicious public key rather than the honest public key).
90//!
91//! Because we rate limit inbound connection attempts per IP/subnet, this poisoning can lead to us dropping legitimate
92//! dial attempts (if quota was already exhausted on useless dial attempts). Recall, an honest dialer doesn't know which public
93//! key actually resides at an address and must try all that collide.
94//!
95//! To mitigate this issue, we shuffle peer dial order on each dial queue refresh. This ensures we eventually dial a poisoned
96//! IP with the correct public key before hitting the rate limit imposed by the listener at said IP.
97//!
98//! _Applications that wish to entirely prevent this class of attack can assert uniqueness of
99//! ingress [std::net::SocketAddr] during
100//! peer registration._
101//!
102//! ## Message Delivery
103//!
104//! Outgoing message submissions can be rejected when a peer's send buffer is full, preventing slow
105//! peers from blocking sends to other peers. Incoming messages are dropped when the application's
106//! receive buffer is full, ensuring ping messages continue to flow and connections remain healthy.
107//!
108//! # Example
109//!
110//! ```rust
111//! use commonware_p2p::{authenticated::lookup::{self, Network}, Address, AddressableManager, Sender, Recipients};
112//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, };
113//! use commonware_runtime::{deterministic, IoBuf, Metrics, Quota, Runner, Spawner, Supervisor};
114//! use commonware_utils::{NZU32, ordered::Map};
115//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
116//!
117//! // Configure context
118//! let runtime_cfg = deterministic::Config::default();
119//! let runner = deterministic::Runner::new(runtime_cfg);
120//!
121//! // Generate identity
122//! //
123//! // In production, the signer should be generated from a secure source of entropy.
124//! let my_sk = ed25519::PrivateKey::from_seed(0);
125//! let my_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
126//!
127//! // Generate peers
128//! //
129//! // In production, peer identities will be provided by some external source of truth
130//! // (like the staking set of a blockchain).
131//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
132//! let peer1_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001);
133//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
134//! let peer2_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002);
135//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
136//! let peer3_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3003);
137//!
138//! // Configure namespace
139//! //
140//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
141//! let application_namespace = b"my-app-namespace";
142//!
143//! // Configure network
144//! //
145//! // In production, use a more conservative configuration like `Config::recommended`.
146//! const MAX_MESSAGE_SIZE: u32 = 1_024; // 1KB
147//! let p2p_cfg = lookup::Config::local(
148//!     my_sk.clone(),
149//!     application_namespace,
150//!     my_addr,
151//!     MAX_MESSAGE_SIZE,
152//! );
153//!
154//! // Start context
155//! runner.start(|context| async move {
156//!     // Initialize network
157//!     let (mut network, mut oracle) = Network::new(context.child("network"), p2p_cfg);
158//!
159//!     // Register authorized peers
160//!     //
161//!     // In production, this would be updated as new peer sets are created (like when
162//!     // the composition of a validator set changes).
163//!     let peers: Map<_, Address> = [
164//!         (my_sk.public_key(), my_addr.into()),
165//!         (peer1, peer1_addr.into()),
166//!         (peer2, peer2_addr.into()),
167//!         (peer3, peer3_addr.into()),
168//!     ].try_into().unwrap();
169//!     oracle.track(0, peers);
170//!
171//!     // Register some channel
172//!     const MAX_MESSAGE_BACKLOG: usize = 128;
173//!     let (mut sender, receiver) = network.register(
174//!         0,
175//!         Quota::per_second(NZU32!(1)),
176//!         MAX_MESSAGE_BACKLOG,
177//!     );
178//!
179//!     // Run network
180//!     network.start();
181//!
182//!     // Example: Use sender
183//!     let _ = sender.send(Recipients::All, IoBuf::from(b"hello"), false);
184//!
185//!     // Graceful shutdown (stops all spawned tasks)
186//!     context.stop(0, None).await.unwrap();
187//! });
188//! ```
189
190mod actors;
191mod channels;
192mod config;
193mod metrics;
194mod network;
195mod types;
196
197use thiserror::Error;
198
199/// Errors that can occur when interacting with the network.
200#[derive(Error, Debug)]
201pub enum Error {
202    #[error("network closed")]
203    NetworkClosed,
204}
205
206pub use actors::tracker::Oracle;
207pub use channels::{Receiver, Sender};
208pub use config::Config;
209pub use network::Network;
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::{
215        authenticated::{
216            lookup::actors::router::{Actor as RouterActor, Config as RouterConfig},
217            relay::Relay,
218        },
219        Address, AddressableManager, CheckedSender as _, Ingress, LimitedSender as _, Provider,
220        Receiver, Recipients, Sender,
221    };
222    use commonware_actor::{Feedback, Unreliable};
223    use commonware_cryptography::{ed25519, Signer as _};
224    use commonware_macros::{select, test_group, test_traced};
225    use commonware_runtime::{
226        deterministic, telemetry::metrics::count_running_tasks, tokio, BufferPooler, Clock, IoBuf,
227        Metrics, Network as RNetwork, Quota, Resolver, Runner, Spawner, Supervisor as _,
228    };
229    use commonware_utils::{
230        channel::mpsc,
231        hostname,
232        ordered::{Map, Set},
233        Hostname, NZUsize, TryCollect, NZU32,
234    };
235    use rand_core::{CryptoRngCore, RngCore};
236    use std::{
237        collections::HashSet,
238        net::{IpAddr, Ipv4Addr, SocketAddr},
239        time::Duration,
240    };
241
242    #[derive(Copy, Clone)]
243    enum Mode {
244        All,
245        Some,
246        One,
247    }
248
249    const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; // 1MB
250    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
251
252    /// Ensure no message rate limiting occurred.
253    ///
254    /// If a message is rate limited, it would be formatted as:
255    ///
256    /// ```text
257    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
258    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
259    /// ```
260    fn assert_no_rate_limiting(metrics: &str) {
261        assert!(
262            !metrics.contains("messages_rate_limited_total{"),
263            "no messages should be rate limited: {metrics}"
264        );
265    }
266
267    /// Test connectivity between `n` peers.
268    ///
269    /// We set a unique `base_port` for each test to avoid "address already in use"
270    /// errors when tests are run immediately after each other.
271    async fn run_network(
272        context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
273        max_message_size: u32,
274        base_port: u16,
275        n: usize,
276        mode: Mode,
277    ) {
278        // Create peers
279        let mut peers_and_sks = Vec::new();
280        for i in 0..n {
281            let private_key = ed25519::PrivateKey::from_seed(i as u64);
282            let public_key = private_key.public_key();
283            let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
284            peers_and_sks.push((private_key, public_key, address));
285        }
286        let peers: Vec<(ed25519::PublicKey, Address)> = peers_and_sks
287            .iter()
288            .map(|(_, pub_key, addr)| (pub_key.clone(), (*addr).into()))
289            .collect::<Vec<_>>();
290
291        // Create networks
292        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
293        for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
294            let public_key = public_key.clone();
295
296            // Create peer context
297            let context = context.child("peer").with_attribute("index", i);
298
299            // Create network
300            let config = Config::test(private_key.clone(), *address, max_message_size);
301            let (mut network, mut oracle) = Network::new(context.child("network"), config);
302
303            // Register peers
304            oracle.track(0, Map::try_from(peers.clone()).unwrap());
305
306            // Register basic application
307            let (mut sender, mut receiver) =
308                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
309
310            // Wait to connect to all peers, and then send messages to everyone
311            network.start();
312
313            // Send/Receive messages
314            context.child("agent").spawn({
315                let complete_sender = complete_sender.clone();
316                let peers = peers.clone();
317                move |context| async move {
318                    // Wait for all peers to send their identity
319                    let receiver = context.child("receiver").spawn(move |_| async move {
320                        // Wait for all peers to send their identity
321                        let mut received = HashSet::new();
322                        while received.len() < n - 1 {
323                            // Ensure message equals sender identity
324                            let (sender, message) = receiver.recv().await.unwrap();
325                            assert_eq!(message, sender.as_ref());
326
327                            // Add to received set
328                            received.insert(sender);
329                        }
330                        complete_sender.send(()).await.unwrap();
331
332                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
333                        loop {
334                            receiver.recv().await.unwrap();
335                        }
336                    });
337
338                    // Send identity to all peers
339                    let sender = context.child("sender").spawn(move |context| async move {
340                        // Get all peers not including self
341                        let mut recipients: Vec<_> = peers
342                            .iter()
343                            .enumerate()
344                            .filter(|(j, _)| i != *j)
345                            .map(|(_, (pk, _))| pk.clone())
346                            .collect();
347                        recipients.sort();
348
349                        // Loop forever to account for unexpected message drops
350                        loop {
351                            match mode {
352                                Mode::One => {
353                                    for pub_key in &recipients {
354                                        // Loop until success
355                                        loop {
356                                            let sent = sender.send(
357                                                Recipients::One(pub_key.clone()),
358                                                public_key.as_ref().to_vec(),
359                                                true,
360                                            );
361                                            if sent.len() != 1 {
362                                                context.sleep(Duration::from_millis(100)).await;
363                                                continue;
364                                            }
365                                            break;
366                                        }
367                                    }
368                                }
369                                Mode::Some | Mode::All => {
370                                    // Loop until all peer sends successful
371                                    loop {
372                                        let sent = sender.send(
373                                            match mode {
374                                                Mode::Some => Recipients::Some(recipients.clone()),
375                                                Mode::All => Recipients::All,
376                                                _ => unreachable!(),
377                                            },
378                                            public_key.as_ref().to_vec(),
379                                            true,
380                                        );
381                                        if sent.len() != recipients.len() {
382                                            context.sleep(Duration::from_millis(100)).await;
383                                            continue;
384                                        }
385                                        break;
386                                    }
387                                }
388                            };
389
390                            // Sleep to avoid busy loop
391                            context.sleep(Duration::from_secs(10)).await;
392                        }
393                    });
394
395                    // Neither task should exit
396                    select! {
397                        receiver = receiver => {
398                            panic!("receiver exited: {receiver:?}");
399                        },
400                        sender = sender => {
401                            panic!("sender exited: {sender:?}");
402                        },
403                    }
404                }
405            });
406        }
407
408        // Wait for all peers to finish
409        for _ in 0..n {
410            complete_receiver.recv().await.unwrap();
411        }
412
413        // Ensure no message rate limiting occurred
414        assert_no_rate_limiting(&context.encode());
415    }
416
417    fn run_deterministic_test(seed: u64, mode: Mode) {
418        // Configure test
419        const NUM_PEERS: usize = 25;
420        const BASE_PORT: u16 = 3000;
421
422        // Run first instance
423        let executor = deterministic::Runner::seeded(seed);
424        let state = executor.start(|context| async move {
425            run_network(
426                context.child("network"),
427                MAX_MESSAGE_SIZE,
428                BASE_PORT,
429                NUM_PEERS,
430                mode,
431            )
432            .await;
433            context.auditor().state()
434        });
435
436        // Compare result to second instance
437        let executor = deterministic::Runner::seeded(seed);
438        let state2 = executor.start(|context| async move {
439            run_network(
440                context.child("network"),
441                MAX_MESSAGE_SIZE,
442                BASE_PORT,
443                NUM_PEERS,
444                mode,
445            )
446            .await;
447            context.auditor().state()
448        });
449        assert_eq!(state, state2);
450    }
451
452    #[test_group("slow")]
453    #[test_traced]
454    fn test_determinism_one() {
455        for i in 0..10 {
456            run_deterministic_test(i, Mode::One);
457        }
458    }
459
460    #[test_group("slow")]
461    #[test_traced]
462    fn test_determinism_some() {
463        for i in 0..10 {
464            run_deterministic_test(i, Mode::Some);
465        }
466    }
467
468    #[test_group("slow")]
469    #[test_traced]
470    fn test_determinism_all() {
471        for i in 0..10 {
472            run_deterministic_test(i, Mode::All);
473        }
474    }
475
476    #[test_traced]
477    fn test_tokio_connectivity() {
478        let executor = tokio::Runner::default();
479        executor.start(|context| async move {
480            let base_port = 4000;
481            let n = 10;
482            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
483        });
484    }
485
486    #[test_traced]
487    fn test_multi_index_oracle() {
488        // Configure test
489        let base_port = 3000;
490        let n: usize = 10;
491
492        // Initialize context
493        let executor = deterministic::Runner::default();
494        executor.start(|context| async move {
495            // Create peers
496            let mut peers_and_sks = Vec::new();
497            for i in 0..n {
498                let sk = ed25519::PrivateKey::from_seed(i as u64);
499                let pk = sk.public_key();
500                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
501                peers_and_sks.push((sk, pk, addr));
502            }
503            let peers = peers_and_sks
504                .iter()
505                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
506                .collect::<Vec<_>>();
507
508            // Create networks
509            let mut waiters = Vec::new();
510            for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
511                // Create peer context
512                let context = context.child("peer").with_attribute("index", i);
513
514                // Create network
515                let config = Config::test(
516                    peer_sk.clone(),
517                    *peer_addr,
518                    1_024 * 1_024, // 1MB
519                );
520                let (mut network, mut oracle) = Network::new(context.child("network"), config);
521
522                // Register peers at separate indices
523                oracle.track(0, Map::try_from([peers[0].clone()]).unwrap());
524                oracle.track(
525                    1,
526                    Map::try_from([peers[1].clone(), peers[2].clone()]).unwrap(),
527                );
528                oracle.track(
529                    2,
530                    peers
531                        .iter()
532                        .skip(2)
533                        .cloned()
534                        .try_collect::<Map<_, _>>()
535                        .unwrap(),
536                );
537
538                // Register basic application
539                let (mut sender, mut receiver) =
540                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
541
542                // Wait to connect to all peers, and then send messages to everyone
543                network.start();
544
545                // Send/Receive messages
546                let msg = peer_pk.clone();
547                let handler = context.child("agent").spawn(move |context| async move {
548                    if i == 0 {
549                        // Loop until success
550                        loop {
551                            if sender
552                                .send(Recipients::All, msg.as_ref().to_vec(), true)
553                                .len()
554                                == n - 1
555                            {
556                                break;
557                            }
558
559                            // Sleep and try again (avoid busy loop)
560                            context.sleep(Duration::from_millis(100)).await;
561                        }
562                    } else {
563                        // Ensure message equals sender identity
564                        let (sender, message) = receiver.recv().await.unwrap();
565                        assert_eq!(message, sender.as_ref());
566                    }
567                });
568
569                // Add to waiters
570                waiters.push(handler);
571            }
572
573            // Wait for waiters to finish (receiver before sender)
574            for waiter in waiters.into_iter().rev() {
575                waiter.await.unwrap();
576            }
577
578            // Ensure no message rate limiting occurred
579            assert_no_rate_limiting(&context.encode());
580        });
581    }
582
583    #[test_traced]
584    #[should_panic(expected = "message too large")]
585    fn test_message_too_large() {
586        // Configure test
587        let base_port = 3000;
588        let n: usize = 2;
589
590        // Initialize context
591        let executor = deterministic::Runner::seeded(0);
592        executor.start(|mut context| async move {
593            // Create peers
594            let mut peers_and_sks = Vec::new();
595            for i in 0..n {
596                let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
597                let peer_pk = peer_sk.public_key();
598                let peer_addr =
599                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
600                peers_and_sks.push((peer_sk, peer_pk, peer_addr));
601            }
602            let peers: Map<_, _> = peers_and_sks
603                .iter()
604                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
605                .try_collect()
606                .unwrap();
607
608            // Create network
609            let (sk, _, addr) = peers_and_sks[0].clone();
610            let config = Config::test(
611                sk,
612                addr,
613                1_024 * 1_024, // 1MB
614            );
615            let (mut network, mut oracle) = Network::new(context.child("network"), config);
616
617            // Register peers
618            oracle.track(0, peers.clone());
619
620            // Register basic application
621            let (mut sender, _) =
622                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
623
624            // Wait to connect to all peers, and then send messages to everyone
625            network.start();
626
627            // Crate random message
628            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
629            context.fill_bytes(&mut msg[..]);
630
631            // Send message
632            let recipient = Recipients::One(peers[1].clone());
633            sender.send(recipient, msg, true);
634        });
635    }
636
637    #[test_traced]
638    fn test_rate_limiting() {
639        // Configure test
640        let base_port = 3000;
641        let n: usize = 2;
642
643        // Initialize context
644        let executor = deterministic::Runner::seeded(0);
645        executor.start(|context| async move {
646            // Create peers
647            let mut peers_and_sks = Vec::new();
648            for i in 0..n {
649                let sk = ed25519::PrivateKey::from_seed(i as u64);
650                let pk = sk.public_key();
651                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
652                peers_and_sks.push((sk, pk, addr));
653            }
654            let peers: Map<_, _> = peers_and_sks
655                .iter()
656                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
657                .try_collect()
658                .unwrap();
659            let (sk0, _, addr0) = peers_and_sks[0].clone();
660            let (sk1, pk1, addr1) = peers_and_sks[1].clone();
661
662            // Create network for peer 0
663            let config0 = Config::test(sk0, addr0, 1_024 * 1_024); // 1MB
664            let (mut network0, mut oracle0) =
665                Network::new(context.child("peer").with_attribute("index", 0), config0);
666            oracle0.track(0, peers.clone());
667            let (mut sender0, _receiver0) =
668                network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
669            network0.start();
670
671            // Create network for peer 1
672            let config1 = Config::test(sk1, addr1, 1_024 * 1_024); // 1MB
673            let (mut network1, mut oracle1) =
674                Network::new(context.child("peer").with_attribute("index", 1), config1);
675            oracle1.track(0, peers.clone());
676            let (_sender1, _receiver1) =
677                network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
678            network1.start();
679
680            // Send first message, which should be allowed and consume the quota.
681            let msg = vec![0u8; 1024]; // 1KB
682            loop {
683                // Confirm message is sent to peer
684                let checked = sender0.check(Recipients::All).unwrap();
685                if !checked.recipients().is_empty() {
686                    checked.send(msg.clone(), true);
687                    break;
688                }
689
690                // Ensure we don't rate limit outbound sends while
691                // waiting for peers to connect
692                context.sleep(Duration::from_mins(1)).await
693            }
694
695            // Immediately send the second message to trigger the rate limit.
696            let sent = sender0.send(Recipients::One(pk1), msg, true);
697            assert!(sent.is_empty());
698
699            // Give the metrics time to reflect the rate-limited message.
700            for _ in 0..10 {
701                assert_no_rate_limiting(&context.encode());
702                context.sleep(Duration::from_millis(100)).await;
703            }
704        });
705    }
706
707    #[test_traced]
708    fn test_unordered_peer_sets() {
709        let (n, base_port) = (10, 3000);
710        let executor = deterministic::Runner::default();
711        executor.start(|context| async move {
712            // Create peers
713            let mut peers_and_sks = Vec::new();
714            for i in 0..n {
715                let sk = ed25519::PrivateKey::from_seed(i as u64);
716                let pk = sk.public_key();
717                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
718                peers_and_sks.push((sk, pk, addr));
719            }
720            let peer0 = peers_and_sks[0].clone();
721            let config = Config::test(peer0.0, peer0.2, 1_024 * 1_024);
722            let (network, mut oracle) = Network::new(context.child("network"), config);
723            network.start();
724
725            // Subscribe to peer sets
726            let mut subscription = oracle.subscribe().await;
727
728            // Register initial peer set
729            let set10: Map<_, _> = peers_and_sks
730                .iter()
731                .take(2)
732                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
733                .try_collect()
734                .unwrap();
735            oracle.track(10, set10.clone());
736            let update = subscription.recv().await.unwrap();
737            assert_eq!(update.index, 10);
738            assert_eq!(&update.latest.primary, set10.keys());
739            assert!(update.latest.secondary.is_empty());
740            assert_eq!(&update.all.primary, set10.keys());
741            assert!(update.all.secondary.is_empty());
742
743            // Register old peer sets (ignored)
744            let set9: Map<_, _> = peers_and_sks
745                .iter()
746                .skip(2)
747                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
748                .try_collect()
749                .unwrap();
750            oracle.track(9, set9.clone());
751
752            // Add new peer set
753            let set11: Map<_, _> = peers_and_sks
754                .iter()
755                .skip(4)
756                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
757                .try_collect()
758                .unwrap();
759            oracle.track(11, set11.clone());
760            let update = subscription.recv().await.unwrap();
761            assert_eq!(update.index, 11);
762            assert_eq!(&update.latest.primary, set11.keys());
763            assert!(update.latest.secondary.is_empty());
764            let all_keys: Set<_> = set10
765                .into_keys()
766                .into_iter()
767                .chain(set11.into_keys().into_iter())
768                .try_collect()
769                .unwrap();
770            assert_eq!(update.all.primary, all_keys);
771            assert!(update.all.secondary.is_empty());
772        });
773    }
774
775    #[test_traced]
776    fn test_graceful_shutdown() {
777        let base_port = 3000;
778        let n: usize = 5;
779
780        let executor = deterministic::Runner::default();
781        executor.start(|context| async move {
782            // Create peers
783            let mut peers_and_sks = Vec::new();
784            for i in 0..n {
785                let sk = ed25519::PrivateKey::from_seed(i as u64);
786                let pk = sk.public_key();
787                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
788                peers_and_sks.push((sk, pk, addr));
789            }
790            let peers: Map<_, _> = peers_and_sks
791                .iter()
792                .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
793                .try_collect()
794                .unwrap();
795
796            // Create networks for all peers
797            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
798            for (i, (sk, pk, addr)) in peers_and_sks.iter().enumerate() {
799                let peer_context = context.child("peer").with_attribute("index", i);
800                let config = Config::test(sk.clone(), *addr, 1_024 * 1_024);
801                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
802
803                // Register peer set
804                oracle.track(0, peers.clone());
805
806                let (mut sender, mut receiver) =
807                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
808                network.start();
809
810                peer_context.child("agent").spawn({
811                    let complete_sender = complete_sender.clone();
812                    let pk = pk.clone();
813                    move |context| async move {
814                        // Wait to connect to at least one other peer
815                        let expected_connections = if i == 0 { n - 1 } else { 1 };
816
817                        // Send a message
818                        loop {
819                            let sent = sender.send(Recipients::All, pk.as_ref().to_vec(), true);
820                            if sent.len() >= expected_connections {
821                                break;
822                            }
823                            context.sleep(Duration::from_millis(100)).await;
824                        }
825
826                        // Signal that this peer is connected
827                        complete_sender.send(()).await.unwrap();
828
829                        // Keep receiving messages until shutdown
830                        loop {
831                            select! {
832                                result = receiver.recv() => {
833                                    if result.is_err() {
834                                        // Channel closed due to shutdown
835                                        break;
836                                    }
837                                },
838                                _ = context.stopped() => {
839                                    // Graceful shutdown signal received
840                                    break;
841                                },
842                            }
843                        }
844                    }
845                });
846            }
847
848            // Wait for all peers to establish connectivity
849            for _ in 0..n {
850                complete_receiver.recv().await.unwrap();
851            }
852
853            // Verify that network actors started for all peers
854            let metrics_before = context.encode();
855            let tasks_running = |metrics: &str, name: &str| -> Option<u64> {
856                metrics.lines().find_map(|line| {
857                    (line.starts_with("runtime_tasks_running{")
858                        && line.contains(&format!("name=\"{name}\""))
859                        && line.contains("kind=\"Task\""))
860                    .then(|| {
861                        line.split_whitespace()
862                            .next_back()
863                            .expect("metric line must have a value")
864                            .parse::<u64>()
865                            .expect("running task count must be an integer")
866                    })
867                })
868            };
869
870            for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
871                let name = format!("peer_network_{actor}");
872                assert_eq!(
873                    tasks_running(&metrics_before, &name),
874                    Some(n as u64),
875                    "{name} should have {n} running tasks before shutdown"
876                );
877            }
878
879            // All peers are connected - now trigger graceful shutdown
880            context.child("shutdown").spawn(move |context| async move {
881                // Trigger graceful shutdown
882                let result = context.stop(0, Some(Duration::from_secs(5))).await;
883
884                // Shutdown should complete successfully without timeout
885                assert!(
886                    result.is_ok(),
887                    "graceful shutdown should complete: {result:?}"
888                );
889            });
890
891            // Wait for shutdown to complete
892            context.stopped().await.unwrap();
893
894            // Give the runtime a tick to process task completions and update metrics
895            context.sleep(Duration::from_millis(100)).await;
896
897            // Verify that all network actors stopped
898            let metrics_after = context.encode();
899            for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
900                let name = format!("peer_network_{actor}");
901                assert_eq!(
902                    tasks_running(&metrics_after, &name),
903                    Some(0),
904                    "{name} should be stopped"
905                );
906            }
907        });
908    }
909
910    #[test_traced]
911    fn test_subscription_includes_self_when_registered() {
912        let base_port = 3000;
913        let executor = deterministic::Runner::default();
914        executor.start(|context| async move {
915            // Create self (peer0) and other peers
916            let self_sk = ed25519::PrivateKey::from_seed(0);
917            let self_pk = self_sk.public_key();
918            let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
919
920            let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
921            let other_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
922
923            // Create network for peer0 (self)
924            let config = Config::test(self_sk, self_addr, 1_024 * 1_024);
925            let (network, mut oracle) = Network::new(context.child("network"), config);
926            network.start();
927
928            // Subscribe to peer sets
929            let mut subscription = oracle.subscribe().await;
930
931            // Register a peer set that does NOT include self
932            let peer_set: Map<_, _> = [(other_pk.clone(), other_addr.into())].try_into().unwrap();
933            oracle.track(1, peer_set.clone());
934
935            // Receive subscription notification
936            let update = subscription.recv().await.unwrap();
937            assert_eq!(update.index, 1);
938            assert_eq!(update.latest.primary.len(), 1);
939            assert_eq!(update.all.primary.len(), 1);
940            assert!(update.all.secondary.is_empty());
941
942            // Self should NOT be in the new set
943            assert!(
944                update.latest.primary.position(&self_pk).is_none(),
945                "new set should not include self"
946            );
947            assert!(
948                update.latest.primary.position(&other_pk).is_some(),
949                "new set should include other"
950            );
951            assert!(
952                update.latest.secondary.is_empty(),
953                "new secondary set should be empty"
954            );
955
956            // Self should NOT be in the peer set (not tracked)
957            assert!(
958                update.all.primary.position(&self_pk).is_none(),
959                "peer set should not include self"
960            );
961            assert!(
962                update.all.primary.position(&other_pk).is_some(),
963                "peer set should include other"
964            );
965
966            // Now register a peer set that DOES include self
967            let peer_set: Map<_, _> = [
968                (self_pk.clone(), self_addr.into()),
969                (other_pk.clone(), other_addr.into()),
970            ]
971            .try_into()
972            .unwrap();
973            oracle.track(2, peer_set.clone());
974
975            // Receive subscription notification
976            let update = subscription.recv().await.unwrap();
977            assert_eq!(update.index, 2);
978            assert_eq!(update.latest.primary.len(), 2);
979            assert_eq!(update.all.primary.len(), 2);
980            assert!(update.all.secondary.is_empty());
981
982            // Both peers should be in the new set
983            assert!(
984                update.latest.primary.position(&self_pk).is_some(),
985                "new set should include self"
986            );
987            assert!(
988                update.latest.primary.position(&other_pk).is_some(),
989                "new set should include other"
990            );
991            assert!(
992                update.latest.secondary.is_empty(),
993                "new secondary set should be empty"
994            );
995
996            // Both peers should be in the peer set
997            assert!(
998                update.all.primary.position(&self_pk).is_some(),
999                "peer set should include self"
1000            );
1001            assert!(
1002                update.all.primary.position(&other_pk).is_some(),
1003                "peer set should include other"
1004            );
1005        });
1006    }
1007
1008    #[test_traced]
1009    fn test_dns_peer_addresses() {
1010        let base_port = 3200;
1011        let n: usize = 3;
1012
1013        let executor = deterministic::Runner::default();
1014        executor.start(|context| async move {
1015            // Create peers
1016            let mut peers_and_sks = Vec::new();
1017            for i in 0..n {
1018                let private_key = ed25519::PrivateKey::from_seed(i as u64);
1019                let public_key = private_key.public_key();
1020                let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1021                let host_str = format!("peer-{i}.local");
1022                let host = Hostname::new(&host_str).unwrap();
1023                peers_and_sks.push((private_key, public_key, socket, host_str, host));
1024            }
1025
1026            // Register DNS mappings for all peers
1027            for (_, _, socket, host_str, _) in &peers_and_sks {
1028                context.resolver_register(host_str.clone(), Some(vec![socket.ip()]));
1029            }
1030
1031            // Create peer addresses with DNS ingress
1032            let peers: Vec<(_, Address)> = peers_and_sks
1033                .iter()
1034                .map(|(_, pk, socket, _, host)| {
1035                    (
1036                        pk.clone(),
1037                        Address::Asymmetric {
1038                            ingress: Ingress::Dns {
1039                                host: host.clone(),
1040                                port: socket.port(),
1041                            },
1042                            egress: *socket,
1043                        },
1044                    )
1045                })
1046                .collect();
1047
1048            // Create networks
1049            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1050            for (i, (private_key, public_key, socket, _, _)) in peers_and_sks.iter().enumerate() {
1051                let context = context.child("peer").with_attribute("index", i);
1052
1053                // Create network
1054                let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1055                let (mut network, mut oracle) = Network::new(context.child("network"), config);
1056
1057                // Register peers with DNS addresses
1058                oracle.track(0, Map::try_from(peers.clone()).unwrap());
1059
1060                // Register channel
1061                let (mut sender, mut receiver) =
1062                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1063
1064                network.start();
1065
1066                // Send/Receive messages
1067                let pk = public_key.clone();
1068                context.child("agent").spawn({
1069                    let complete_sender = complete_sender.clone();
1070                    let peers = peers.clone();
1071                    move |context| async move {
1072                        // Wait for messages from other peers
1073                        let receiver = context.child("receiver").spawn(move |_| async move {
1074                            let mut received = HashSet::new();
1075                            while received.len() < n - 1 {
1076                                let (sender, message) = receiver.recv().await.unwrap();
1077                                assert_eq!(message, sender.as_ref());
1078                                received.insert(sender);
1079                            }
1080                            complete_sender.send(()).await.unwrap();
1081
1082                            loop {
1083                                receiver.recv().await.unwrap();
1084                            }
1085                        });
1086
1087                        // Send identity to all peers
1088                        let sender_task =
1089                            context.child("sender").spawn(move |context| async move {
1090                                loop {
1091                                    let mut recipients: Vec<_> = peers
1092                                        .iter()
1093                                        .filter(|(p, _)| p != &pk)
1094                                        .map(|(p, _)| p.clone())
1095                                        .collect();
1096                                    recipients.sort();
1097
1098                                    loop {
1099                                        let mut sent = sender.send(
1100                                            Recipients::All,
1101                                            pk.as_ref().to_vec(),
1102                                            true,
1103                                        );
1104                                        if sent.len() != n - 1 {
1105                                            context.sleep(Duration::from_millis(100)).await;
1106                                            continue;
1107                                        }
1108                                        sent.sort();
1109                                        assert_eq!(sent, recipients);
1110                                        break;
1111                                    }
1112
1113                                    context.sleep(Duration::from_secs(10)).await;
1114                                }
1115                            });
1116
1117                        select! {
1118                            receiver = receiver => {
1119                                panic!("receiver exited: {receiver:?}")
1120                            },
1121                            sender = sender_task => {
1122                                panic!("sender exited: {sender:?}")
1123                            },
1124                        }
1125                    }
1126                });
1127            }
1128
1129            // Wait for all peers to exchange messages
1130            for _ in 0..n {
1131                complete_receiver.recv().await.unwrap();
1132            }
1133
1134            assert_no_rate_limiting(&context.encode());
1135        });
1136    }
1137
1138    #[test_traced]
1139    fn test_mixed_socket_and_dns_addresses() {
1140        let base_port = 3300;
1141        let n: usize = 4;
1142
1143        let executor = deterministic::Runner::default();
1144        executor.start(|context| async move {
1145            // Create peers
1146            let mut peers_and_sks = Vec::new();
1147            for i in 0..n {
1148                let private_key = ed25519::PrivateKey::from_seed(i as u64);
1149                let public_key = private_key.public_key();
1150                let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1151                peers_and_sks.push((private_key, public_key, socket));
1152            }
1153
1154            // Register DNS mappings for peers 2 and 3 only
1155            for (i, (_, _, socket)) in peers_and_sks.iter().enumerate().skip(2) {
1156                context.resolver_register(format!("peer-{i}.local"), Some(vec![socket.ip()]));
1157            }
1158
1159            // Create peer addresses - peers 0,1 use Symmetric, peers 2,3 use DNS Asymmetric
1160            let peers: Vec<(_, Address)> = peers_and_sks
1161                .iter()
1162                .enumerate()
1163                .map(|(i, (_, pk, socket))| {
1164                    let addr = if i < 2 {
1165                        // Peers 0, 1: Symmetric socket address
1166                        Address::Symmetric(*socket)
1167                    } else {
1168                        // Peers 2, 3: Asymmetric with DNS ingress
1169                        Address::Asymmetric {
1170                            ingress: Ingress::Dns {
1171                                host: hostname!(&format!("peer-{i}.local")),
1172                                port: socket.port(),
1173                            },
1174                            egress: *socket,
1175                        }
1176                    };
1177                    (pk.clone(), addr)
1178                })
1179                .collect();
1180
1181            // Create networks
1182            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1183            for (i, (private_key, public_key, socket)) in peers_and_sks.iter().enumerate() {
1184                let context = context.child("peer").with_attribute("index", i);
1185
1186                // Create network
1187                let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1188                let (mut network, mut oracle) = Network::new(context.child("network"), config);
1189
1190                // Register peers with mixed addresses
1191                oracle.track(0, Map::try_from(peers.clone()).unwrap());
1192
1193                // Register channel
1194                let (mut sender, mut receiver) =
1195                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1196
1197                network.start();
1198
1199                // Send/Receive messages
1200                let pk = public_key.clone();
1201                context.child("agent").spawn({
1202                    let complete_sender = complete_sender.clone();
1203                    let peers = peers.clone();
1204                    move |context| async move {
1205                        // Wait for messages from other peers
1206                        let receiver = context.child("receiver").spawn(move |_| async move {
1207                            let mut received = HashSet::new();
1208                            while received.len() < n - 1 {
1209                                let (sender, message) = receiver.recv().await.unwrap();
1210                                assert_eq!(message, sender.as_ref());
1211                                received.insert(sender);
1212                            }
1213                            complete_sender.send(()).await.unwrap();
1214
1215                            loop {
1216                                receiver.recv().await.unwrap();
1217                            }
1218                        });
1219
1220                        // Send identity to all peers
1221                        let sender_task =
1222                            context.child("sender").spawn(move |context| async move {
1223                                loop {
1224                                    let mut recipients: Vec<_> = peers
1225                                        .iter()
1226                                        .filter(|(p, _)| p != &pk)
1227                                        .map(|(p, _)| p.clone())
1228                                        .collect();
1229                                    recipients.sort();
1230
1231                                    loop {
1232                                        let mut sent = sender.send(
1233                                            Recipients::All,
1234                                            pk.as_ref().to_vec(),
1235                                            true,
1236                                        );
1237                                        if sent.len() != n - 1 {
1238                                            context.sleep(Duration::from_millis(100)).await;
1239                                            continue;
1240                                        }
1241                                        sent.sort();
1242                                        assert_eq!(sent, recipients);
1243                                        break;
1244                                    }
1245
1246                                    context.sleep(Duration::from_secs(10)).await;
1247                                }
1248                            });
1249
1250                        select! {
1251                            receiver = receiver => {
1252                                panic!("receiver exited: {receiver:?}")
1253                            },
1254                            sender = sender_task => {
1255                                panic!("sender exited: {sender:?}")
1256                            },
1257                        }
1258                    }
1259                });
1260            }
1261
1262            // Wait for all peers to exchange messages
1263            for _ in 0..n {
1264                complete_receiver.recv().await.unwrap();
1265            }
1266
1267            assert_no_rate_limiting(&context.encode());
1268        });
1269    }
1270
1271    #[test_traced]
1272    fn test_dns_resolving_to_private_ip_not_dialed() {
1273        // Test that when allow_private_ips=false, DNS addresses that resolve
1274        // to private IPs are not dialed.
1275        let base_port = 4400;
1276        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1277        executor.start(|context| async move {
1278            let peer0 = ed25519::PrivateKey::from_seed(0);
1279            let peer1 = ed25519::PrivateKey::from_seed(1);
1280
1281            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1282            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1283
1284            // Register DNS mapping that resolves to localhost (private IP)
1285            context.resolver_register("peer-0.local".to_string(), Some(vec![socket0.ip()]));
1286
1287            // Create peer 0 with allow_private_ips=true
1288            let mut config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1289            config0.allow_private_ips = true;
1290            let (mut network0, mut oracle0) =
1291                Network::new(context.child("peer").with_attribute("index", 0), config0);
1292
1293            // Peer 0 knows about peer 1 with a socket address
1294            let peers0: Vec<(_, Address)> = vec![
1295                (peer0.public_key(), Address::Symmetric(socket0)),
1296                (peer1.public_key(), Address::Symmetric(socket1)),
1297            ];
1298            oracle0.track(0, Map::try_from(peers0).unwrap());
1299
1300            let (_sender0, mut receiver0) =
1301                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1302            network0.start();
1303
1304            // Create peer 1 with allow_private_ips=false
1305            let mut config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1306            config1.allow_private_ips = false; // This should prevent dialing the private IP
1307            let (mut network1, mut oracle1) =
1308                Network::new(context.child("peer").with_attribute("index", 1), config1);
1309
1310            // Peer 1 knows about peer 0 with a DNS address that resolves to private IP
1311            let peers1: Vec<(_, Address)> = vec![
1312                (
1313                    peer0.public_key(),
1314                    Address::Asymmetric {
1315                        ingress: Ingress::Dns {
1316                            host: hostname!("peer-0.local"),
1317                            port: socket0.port(),
1318                        },
1319                        egress: socket0,
1320                    },
1321                ),
1322                (peer1.public_key(), Address::Symmetric(socket1)),
1323            ];
1324            oracle1.track(0, Map::try_from(peers1).unwrap());
1325
1326            let (mut sender1, _receiver1) =
1327                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1328            network1.start();
1329
1330            // Wait for a period during which peer 1 would normally connect
1331            context.sleep(Duration::from_secs(5)).await;
1332
1333            // Try to send from peer 1 - should not reach anyone since private IPs are blocked
1334            let sent = sender1.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
1335            assert!(
1336                sent.is_empty(),
1337                "peer 1 should not have connected to peer 0 (private IP)"
1338            );
1339
1340            // Verify peer 0 received nothing from peer 1
1341            select! {
1342                msg = receiver0.recv() => {
1343                    panic!("peer 0 should not have received any message, got: {msg:?}");
1344                },
1345                _ = context.sleep(Duration::from_secs(1)) => {
1346                    // Expected: timeout with no message
1347                },
1348            }
1349        });
1350    }
1351
1352    #[test_traced]
1353    fn test_dns_mixed_ips_connectivity() {
1354        // Test that peers can connect even when DNS resolves to multiple IPs
1355        // where most are unreachable. The dialer randomly picks an IP, so
1356        // eventually it should pick the reachable one.
1357        //
1358        // Run over 25 seeds to ensure we exercise the random IP selection.
1359        for seed in 0..25 {
1360            let base_port = 3500;
1361
1362            let cfg = deterministic::Config::default()
1363                .with_seed(seed)
1364                .with_timeout(Some(Duration::from_secs(120)));
1365            let executor = deterministic::Runner::new(cfg);
1366            executor.start(|context| async move {
1367                let peer0 = ed25519::PrivateKey::from_seed(0);
1368                let peer1 = ed25519::PrivateKey::from_seed(1);
1369
1370                let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1371                let socket0 = SocketAddr::new(good_ip, base_port);
1372                let socket1 = SocketAddr::new(good_ip, base_port + 1);
1373
1374                // Register DNS mappings with 3 bad IPs and 1 good IP for both peers
1375                let mut all_ips0: Vec<IpAddr> = (1..=3)
1376                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1377                    .collect();
1378                all_ips0.push(good_ip);
1379                context.resolver_register("peer-0.local", Some(all_ips0));
1380
1381                let mut all_ips1: Vec<IpAddr> = (1..=3)
1382                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1383                    .collect();
1384                all_ips1.push(good_ip);
1385                context.resolver_register("peer-1.local", Some(all_ips1));
1386
1387                // Create peer addresses - both peers use DNS with mixed IPs
1388                let peers: Vec<(_, Address)> = vec![
1389                    (
1390                        peer0.public_key(),
1391                        Address::Asymmetric {
1392                            ingress: Ingress::Dns {
1393                                host: hostname!("peer-0.local"),
1394                                port: base_port,
1395                            },
1396                            egress: socket0,
1397                        },
1398                    ),
1399                    (
1400                        peer1.public_key(),
1401                        Address::Asymmetric {
1402                            ingress: Ingress::Dns {
1403                                host: hostname!("peer-1.local"),
1404                                port: base_port + 1,
1405                            },
1406                            egress: socket1,
1407                        },
1408                    ),
1409                ];
1410
1411                // Create peer 0
1412                let config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1413                let (mut network0, mut oracle0) =
1414                    Network::new(context.child("peer").with_attribute("index", 0), config0);
1415                oracle0.track(0, Map::try_from(peers.clone()).unwrap());
1416                let (_sender0, mut receiver0) =
1417                    network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1418                network0.start();
1419
1420                // Create peer 1
1421                let config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1422                let (mut network1, mut oracle1) =
1423                    Network::new(context.child("peer").with_attribute("index", 1), config1);
1424                oracle1.track(0, Map::try_from(peers.clone()).unwrap());
1425                let (mut sender1, _receiver1) =
1426                    network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1427                network1.start();
1428
1429                // Send until peers connect (may take multiple attempts due to random IP selection).
1430                loop {
1431                    let checked = sender1.check(Recipients::All).unwrap();
1432                    if !checked.recipients().is_empty() {
1433                        checked.send(peer1.public_key().as_ref().to_vec(), true);
1434                    }
1435
1436                    select! {
1437                        result = receiver0.recv() => {
1438                            let (sender, msg) = result.unwrap();
1439                            assert_eq!(sender, peer1.public_key());
1440                            assert_eq!(msg, peer1.public_key().as_ref());
1441                            break;
1442                        },
1443                        _ = context.sleep(Duration::from_millis(100)) => {},
1444                    }
1445                }
1446            });
1447        }
1448    }
1449
1450    #[test_traced]
1451    fn test_many_peer_restart_with_new_address() {
1452        let base_port = 9500;
1453        let n = 5;
1454
1455        let executor = deterministic::Runner::default();
1456        executor.start(|context| async move {
1457            // Create peers
1458            let peers: Vec<_> = (0..n)
1459                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1460                .collect();
1461            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1462
1463            // Track senders/receivers/oracles/handles across restarts
1464            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1465            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1466            let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1467            let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1468                (0..n).map(|_| None).collect();
1469
1470            // Track port allocations (updated on restart)
1471            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1472
1473            // Create initial peer set with addresses
1474            let peer_set: Vec<(_, Address)> = addresses
1475                .iter()
1476                .enumerate()
1477                .map(|(i, pk)| {
1478                    (
1479                        pk.clone(),
1480                        Address::Symmetric(SocketAddr::new(
1481                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1482                            ports[i],
1483                        )),
1484                    )
1485                })
1486                .collect();
1487
1488            // Create networks for all peers
1489            for (i, peer) in peers.iter().enumerate() {
1490                let peer_context = context.child("peer").with_attribute("index", i);
1491
1492                let config = Config::test(
1493                    peer.clone(),
1494                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1495                    MAX_MESSAGE_SIZE,
1496                );
1497                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1498
1499                // Register peer set
1500                oracle.track(0, Map::try_from(peer_set.clone()).unwrap());
1501
1502                let (sender, receiver) =
1503                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1504                senders[i] = Some(sender);
1505                receivers[i] = Some(receiver);
1506                oracles[i] = Some(oracle);
1507
1508                let handle = network.start();
1509                handles[i] = Some(handle);
1510            }
1511
1512            // Wait for full connectivity (each peer can send to all others)
1513            for (i, sender) in senders.iter_mut().enumerate() {
1514                let sender = sender.as_mut().unwrap();
1515                loop {
1516                    let sent = sender.send(
1517                        Recipients::All,
1518                        peers[i].public_key().as_ref().to_vec(),
1519                        true,
1520                    );
1521                    if sent.len() == n - 1 {
1522                        break;
1523                    }
1524                    context.sleep(Duration::from_millis(100)).await;
1525                }
1526            }
1527
1528            // Verify each peer can receive from all others
1529            for receiver in receivers.iter_mut() {
1530                let receiver = receiver.as_mut().unwrap();
1531                let mut received = HashSet::new();
1532                while received.len() < n - 1 {
1533                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1534                    assert_eq!(message, sender.as_ref());
1535                    received.insert(sender);
1536                }
1537            }
1538
1539            // Restart each non-first peer with a new port, multiple rounds
1540            let mut restart_counter = 0u16;
1541            for round in 0..3 {
1542                for restart_peer_idx in 1..n {
1543                    // Allocate a new unique port
1544                    restart_counter += 1;
1545                    let new_port = base_port + 100 + restart_counter;
1546                    ports[restart_peer_idx] = new_port;
1547
1548                    // Abort the peer's network
1549                    if let Some(handle) = handles[restart_peer_idx].take() {
1550                        handle.abort();
1551                    }
1552                    senders[restart_peer_idx] = None;
1553                    receivers[restart_peer_idx] = None;
1554                    oracles[restart_peer_idx] = None;
1555
1556                    // Create updated peer set with new port
1557                    let updated_peer_set: Vec<(_, Address)> = addresses
1558                        .iter()
1559                        .enumerate()
1560                        .map(|(i, pk)| {
1561                            (
1562                                pk.clone(),
1563                                Address::Symmetric(SocketAddr::new(
1564                                    IpAddr::V4(Ipv4Addr::LOCALHOST),
1565                                    ports[i],
1566                                )),
1567                            )
1568                        })
1569                        .collect();
1570
1571                    // Update oracle on all running peers
1572                    for oracle in oracles.iter_mut().flatten() {
1573                        oracle.track(
1574                            (round * (n - 1) + restart_peer_idx) as u64,
1575                            Map::try_from(updated_peer_set.clone()).unwrap(),
1576                        );
1577                    }
1578
1579                    // Restart the peer with new port
1580                    let peer_context = context
1581                        .child("peer_round")
1582                        .with_attribute("index", restart_peer_idx)
1583                        .with_attribute("round", round);
1584                    let config = Config::test(
1585                        peers[restart_peer_idx].clone(),
1586                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1587                        MAX_MESSAGE_SIZE,
1588                    );
1589                    let (mut network, mut oracle) =
1590                        Network::new(peer_context.child("network"), config);
1591
1592                    oracle.track(
1593                        (round * (n - 1) + restart_peer_idx) as u64,
1594                        Map::try_from(updated_peer_set.clone()).unwrap(),
1595                    );
1596
1597                    let (sender, receiver) = network.register(
1598                        0,
1599                        Quota::per_second(NZU32!(100)),
1600                        DEFAULT_MESSAGE_BACKLOG,
1601                    );
1602                    senders[restart_peer_idx] = Some(sender);
1603                    receivers[restart_peer_idx] = Some(receiver);
1604                    oracles[restart_peer_idx] = Some(oracle);
1605
1606                    let handle = network.start();
1607                    handles[restart_peer_idx] = Some(handle);
1608
1609                    // Wait for the restarted peer to reconnect to all others
1610                    let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1611                    loop {
1612                        let sent = restarted_sender.send(
1613                            Recipients::All,
1614                            peers[restart_peer_idx].public_key().as_ref().to_vec(),
1615                            true,
1616                        );
1617                        if sent.len() == n - 1 {
1618                            break;
1619                        }
1620                        context.sleep(Duration::from_millis(100)).await;
1621                    }
1622
1623                    // Verify all other peers can send to the restarted peer
1624                    for i in 0..n {
1625                        if i == restart_peer_idx {
1626                            continue;
1627                        }
1628                        let sender = senders[i].as_mut().unwrap();
1629                        loop {
1630                            let sent = sender.send(
1631                                Recipients::One(addresses[restart_peer_idx].clone()),
1632                                peers[i].public_key().as_ref().to_vec(),
1633                                true,
1634                            );
1635                            if sent.len() == 1 {
1636                                break;
1637                            }
1638                            context.sleep(Duration::from_millis(100)).await;
1639                        }
1640                    }
1641
1642                    // Verify the restarted peer receives from all others
1643                    let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1644                    let mut received = HashSet::new();
1645                    while received.len() < n - 1 {
1646                        let (sender, message): (ed25519::PublicKey, _) =
1647                            restarted_receiver.recv().await.unwrap();
1648                        assert_eq!(message, sender.as_ref());
1649                        received.insert(sender);
1650                    }
1651                }
1652            }
1653
1654            assert_no_rate_limiting(&context.encode());
1655        });
1656    }
1657
1658    #[test_traced]
1659    fn test_simultaneous_peer_restart() {
1660        let base_port = 9700;
1661        let n = 5;
1662
1663        let executor = deterministic::Runner::default();
1664        executor.start(|context| async move {
1665            // Create peers
1666            let peers: Vec<_> = (0..n)
1667                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1668                .collect();
1669            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1670
1671            // Track port allocations (updated on restart)
1672            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1673
1674            // Create initial peer set with addresses
1675            let peer_set: Vec<(_, Address)> = addresses
1676                .iter()
1677                .enumerate()
1678                .map(|(i, pk)| {
1679                    (
1680                        pk.clone(),
1681                        Address::Symmetric(SocketAddr::new(
1682                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1683                            ports[i],
1684                        )),
1685                    )
1686                })
1687                .collect();
1688
1689            // Track senders/receivers/oracles/handles across restarts
1690            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1691            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1692            let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1693            let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1694                (0..n).map(|_| None).collect();
1695
1696            // Create networks for all peers
1697            for (i, peer) in peers.iter().enumerate() {
1698                let peer_context = context.child("peer").with_attribute("index", i);
1699
1700                let config = Config::test(
1701                    peer.clone(),
1702                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1703                    MAX_MESSAGE_SIZE,
1704                );
1705                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1706
1707                // Register peer set
1708                oracle.track(0, Map::try_from(peer_set.clone()).unwrap());
1709
1710                let (sender, receiver) =
1711                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1712                senders[i] = Some(sender);
1713                receivers[i] = Some(receiver);
1714                oracles[i] = Some(oracle);
1715
1716                let handle = network.start();
1717                handles[i] = Some(handle);
1718            }
1719
1720            // Wait for full connectivity
1721            for (i, sender) in senders.iter_mut().enumerate() {
1722                let sender = sender.as_mut().unwrap();
1723                loop {
1724                    let sent = sender.send(
1725                        Recipients::All,
1726                        peers[i].public_key().as_ref().to_vec(),
1727                        true,
1728                    );
1729                    if sent.len() == n - 1 {
1730                        break;
1731                    }
1732                    context.sleep(Duration::from_millis(100)).await;
1733                }
1734            }
1735
1736            // Verify each peer can receive from all others
1737            for receiver in receivers.iter_mut() {
1738                let receiver = receiver.as_mut().unwrap();
1739                let mut received = HashSet::new();
1740                while received.len() < n - 1 {
1741                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1742                    assert_eq!(message, sender.as_ref());
1743                    received.insert(sender);
1744                }
1745            }
1746
1747            // Shutdown all peers except peer 0 simultaneously.
1748            //
1749            // We keep peer 0 alive to exercise the case where multiple
1750            // peers churn at once.
1751            let restart_peers: Vec<usize> = (1..n).collect();
1752            for &idx in &restart_peers {
1753                if let Some(handle) = handles[idx].take() {
1754                    handle.abort();
1755                }
1756                senders[idx] = None;
1757                receivers[idx] = None;
1758                oracles[idx] = None;
1759                ports[idx] = base_port + 100 + idx as u16;
1760            }
1761
1762            // Wait for connections to be detected as closed
1763            context.sleep(Duration::from_secs(2)).await;
1764
1765            // Create updated peer set with new ports
1766            let updated_peer_set: Vec<(_, Address)> = addresses
1767                .iter()
1768                .enumerate()
1769                .map(|(i, pk)| {
1770                    (
1771                        pk.clone(),
1772                        Address::Symmetric(SocketAddr::new(
1773                            IpAddr::V4(Ipv4Addr::LOCALHOST),
1774                            ports[i],
1775                        )),
1776                    )
1777                })
1778                .collect();
1779
1780            // Update oracle on peer 0 (the only running peer)
1781            oracles[0]
1782                .as_mut()
1783                .unwrap()
1784                .track(1, Map::try_from(updated_peer_set.clone()).unwrap());
1785
1786            // Restart all shutdown peers with new ports
1787            for &idx in &restart_peers {
1788                let peer_context = context.child("peer_restarted").with_attribute("index", idx);
1789                let config = Config::test(
1790                    peers[idx].clone(),
1791                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1792                    MAX_MESSAGE_SIZE,
1793                );
1794                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1795
1796                oracle.track(1, Map::try_from(updated_peer_set.clone()).unwrap());
1797
1798                let (sender, receiver) =
1799                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1800                senders[idx] = Some(sender);
1801                receivers[idx] = Some(receiver);
1802                oracles[idx] = Some(oracle);
1803
1804                let handle = network.start();
1805                handles[idx] = Some(handle);
1806            }
1807
1808            // Wait for full connectivity after restart
1809            for (i, sender) in senders.iter_mut().enumerate() {
1810                let sender = sender.as_mut().unwrap();
1811                loop {
1812                    let sent = sender.send(
1813                        Recipients::All,
1814                        peers[i].public_key().as_ref().to_vec(),
1815                        true,
1816                    );
1817                    if sent.len() == n - 1 {
1818                        break;
1819                    }
1820                    context.sleep(Duration::from_millis(100)).await;
1821                }
1822            }
1823
1824            // Verify all peers can receive from all others after restart
1825            for receiver in receivers.iter_mut() {
1826                let receiver = receiver.as_mut().unwrap();
1827                let mut received = HashSet::new();
1828                while received.len() < n - 1 {
1829                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1830                    assert_eq!(message, sender.as_ref());
1831                    received.insert(sender);
1832                }
1833            }
1834
1835            assert_no_rate_limiting(&context.encode());
1836        });
1837    }
1838    #[test_traced]
1839    fn test_operations_after_shutdown_do_not_panic() {
1840        let executor = deterministic::Runner::default();
1841        executor.start(|context| async move {
1842            let peer = ed25519::PrivateKey::from_seed(0);
1843            let address = peer.public_key();
1844
1845            let peer_context = context.child("peer");
1846            let config = Config::test(
1847                peer.clone(),
1848                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1849                MAX_MESSAGE_SIZE,
1850            );
1851            let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1852
1853            // Register channel and peer set
1854            let (mut sender, _receiver) =
1855                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1856            let peer_addr =
1857                Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1858            let peers: Map<ed25519::PublicKey, Address> =
1859                vec![(address.clone(), peer_addr)].try_into().unwrap();
1860            oracle.track(0, peers.clone());
1861
1862            // Start and immediately abort the network
1863            let handle = network.start();
1864            handle.abort();
1865
1866            // Wait for shutdown to propagate
1867            context.sleep(Duration::from_millis(100)).await;
1868
1869            // Oracle operations should not panic even after shutdown
1870            oracle.track(1, peers.clone());
1871            let _ = oracle.peer_set(0).await;
1872            let _ = oracle.subscribe().await;
1873            crate::block_peer(&mut oracle, address.clone());
1874
1875            // Sender operations should not panic even after shutdown
1876            let sent = sender.send(Recipients::All, address.as_ref().to_vec(), true);
1877            assert!(sent.is_empty());
1878        });
1879    }
1880
1881    fn clean_shutdown(seed: u64) {
1882        let cfg = deterministic::Config::default()
1883            .with_seed(seed)
1884            .with_timeout(Some(Duration::from_secs(30)));
1885        let executor = deterministic::Runner::new(cfg);
1886        executor.start(|context| async move {
1887            let peer = ed25519::PrivateKey::from_seed(0);
1888
1889            let peer_context = context.child("peer");
1890            let config = Config::test(
1891                peer.clone(),
1892                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1893                MAX_MESSAGE_SIZE,
1894            );
1895            let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1896
1897            // Register channel and peer set
1898            let (_, _) =
1899                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1900            let peer_addr =
1901                Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1902            let peers: Map<ed25519::PublicKey, Address> =
1903                vec![(peer.public_key(), peer_addr)].try_into().unwrap();
1904            oracle.track(0, peers);
1905
1906            // Start the network
1907            let handle = network.start();
1908
1909            // Allow tasks to start
1910            context.sleep(Duration::from_millis(100)).await;
1911
1912            // Count running tasks under the network prefix
1913            let running_before = count_running_tasks(&context, "peer_network");
1914            assert!(
1915                running_before > 0,
1916                "at least one network task should be running"
1917            );
1918
1919            // Abort the network
1920            handle.abort();
1921            let _ = handle.await;
1922
1923            // Give the runtime a tick to process aborts
1924            context.sleep(Duration::from_millis(100)).await;
1925
1926            // Verify all network tasks are stopped
1927            let running_after = count_running_tasks(&context, "peer_network");
1928            assert_eq!(
1929                running_after, 0,
1930                "all network tasks should be stopped, but {running_after} still running"
1931            );
1932        });
1933    }
1934
1935    #[test_traced]
1936    fn test_clean_shutdown() {
1937        for seed in 0..25 {
1938            clean_shutdown(seed);
1939        }
1940    }
1941
1942    fn duplicate_addresses_disconnected(seed: u64) {
1943        let base_port = 6000;
1944        let executor = deterministic::Runner::seeded(seed);
1945        executor.start(|context| async move {
1946            let peer0 = ed25519::PrivateKey::from_seed(0);
1947            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1948            let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
1949            let peer1 = ed25519::PrivateKey::from_seed(1);
1950            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1951            let peer2 = ed25519::PrivateKey::from_seed(2);
1952
1953            // Start peer 0
1954            let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
1955            let (mut network0, mut oracle0) =
1956                Network::new(context.child("peer").with_attribute("index", 0), config0);
1957            let (mut sender0, _receiver0) =
1958                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1959            network0.start();
1960
1961            // Create peer set with an incorrect address for peer 2
1962            let peer_set0: Vec<(_, Address)> = vec![
1963                (peer0.public_key(), Address::Symmetric(socket0)),
1964                (peer1.public_key(), Address::Symmetric(socket1)),
1965                (peer2.public_key(), Address::Symmetric(socket1)),
1966            ];
1967            oracle0.track(0, Map::try_from(peer_set0).unwrap());
1968
1969            // Wait for connections to be attempted
1970            context.sleep(Duration::from_secs(30)).await;
1971
1972            // Peer 0 can't send to anyone
1973            let sent = sender0.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
1974            assert!(sent.is_empty());
1975
1976            // Start peer 1 (has duplicate but correct address)
1977            let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
1978            let (mut network1, mut oracle1) =
1979                Network::new(context.child("peer").with_attribute("index", 1), config1);
1980            let (_sender1, mut receiver1) =
1981                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1982            network1.start();
1983
1984            // Create peer set where peer 0 must dial peer 1 for connection
1985            let peer_set1: Vec<(_, Address)> = vec![
1986                (peer0.public_key(), Address::Symmetric(wrong_socket0)),
1987                (peer1.public_key(), Address::Symmetric(socket1)),
1988                (peer2.public_key(), Address::Symmetric(socket1)),
1989            ];
1990            oracle1.track(0, Map::try_from(peer_set1).unwrap());
1991
1992            // Wait for connections to be made
1993            context.sleep(Duration::from_secs(30)).await;
1994
1995            // Now peer 0 should connect to peer 1 at correct address
1996            loop {
1997                let sent =
1998                    sender0.send(Recipients::All, peer0.public_key().as_ref().to_vec(), true);
1999                if sent.len() == 1 {
2000                    assert_eq!(sent[0], peer1.public_key());
2001                    break;
2002                }
2003                context.sleep(Duration::from_millis(100)).await;
2004            }
2005            let (sender, _) = receiver1.recv().await.unwrap();
2006            assert_eq!(sender, peer0.public_key());
2007        });
2008    }
2009
2010    #[test_traced]
2011    fn test_duplicate_addresses_disconnected() {
2012        // Ensure different dial orders explored by running with different seeds
2013        for seed in 0..25 {
2014            duplicate_addresses_disconnected(seed);
2015        }
2016    }
2017
2018    #[test_traced]
2019    fn test_duplicate_addresses_connected() {
2020        let base_port = 6000;
2021        let executor = deterministic::Runner::default();
2022        executor.start(|context| async move {
2023            let peer0 = ed25519::PrivateKey::from_seed(0);
2024            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2025            let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2026            let peer1 = ed25519::PrivateKey::from_seed(1);
2027            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2028            let peer2 = ed25519::PrivateKey::from_seed(2);
2029            let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
2030
2031            // Start peer 0
2032            let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
2033            let (mut network0, mut oracle0) =
2034                Network::new(context.child("peer").with_attribute("index", 0), config0);
2035            let (mut sender0, mut receiver0) =
2036                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2037            network0.start();
2038
2039            // Start peer 2
2040            let config2 = Config::test(peer2.clone(), socket2, MAX_MESSAGE_SIZE);
2041            let (mut network2, mut oracle2) =
2042                Network::new(context.child("peer").with_attribute("index", 2), config2);
2043            let (_sender2, mut receiver2) =
2044                network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2045            network2.start();
2046
2047            // Create peer set with an incorrect address for peer 2
2048            let peer_set: Vec<(_, Address)> = vec![
2049                (peer0.public_key(), Address::Symmetric(socket0)),
2050                (peer1.public_key(), Address::Symmetric(socket1)),
2051                (peer2.public_key(), Address::Symmetric(socket1)),
2052            ];
2053            oracle0.track(0, Map::try_from(peer_set.clone()).unwrap());
2054            oracle2.track(0, Map::try_from(peer_set).unwrap());
2055
2056            // Wait for connections to be made
2057            context.sleep(Duration::from_secs(30)).await;
2058
2059            // Peer 0 can send to peer 2
2060            loop {
2061                let sent =
2062                    sender0.send(Recipients::All, peer2.public_key().as_ref().to_vec(), true);
2063                if sent.len() == 1 {
2064                    assert_eq!(sent[0], peer2.public_key());
2065                    break;
2066                }
2067                context.sleep(Duration::from_millis(100)).await;
2068            }
2069            let (sender, _) = receiver2.recv().await.unwrap();
2070            assert_eq!(sender, peer0.public_key());
2071
2072            // Start peer 1 (has duplicate but correct address)
2073            let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
2074            let (mut network1, mut oracle1) =
2075                Network::new(context.child("peer").with_attribute("index", 1), config1);
2076            let (mut sender1, _receiver1) =
2077                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2078            network1.start();
2079
2080            // Create peer set where peer 0 must dial peer 1 for connection
2081            let peer_set1: Vec<(_, Address)> = vec![
2082                (peer0.public_key(), Address::Symmetric(wrong_socket0)),
2083                (peer1.public_key(), Address::Symmetric(socket1)),
2084                (peer2.public_key(), Address::Symmetric(socket1)),
2085            ];
2086            oracle1.track(0, Map::try_from(peer_set1).unwrap());
2087
2088            // Wait for connections to be made
2089            context.sleep(Duration::from_secs(30)).await;
2090
2091            // Now peer 0 should connect to peer 1 at correct address and peer 2 should dial peer 1
2092            loop {
2093                let sent =
2094                    sender1.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
2095                if sent.len() == 2 {
2096                    assert!(sent.contains(&peer0.public_key()));
2097                    assert!(sent.contains(&peer2.public_key()));
2098                    break;
2099                }
2100                context.sleep(Duration::from_millis(100)).await;
2101            }
2102            let mut received0 = false;
2103            while let Ok((sender, _)) = receiver0.recv().await {
2104                // May have some items around from the initial send
2105                if sender == peer1.public_key() {
2106                    received0 = true;
2107                    break;
2108                }
2109            }
2110            assert!(received0);
2111            let (sender, _) = receiver2.recv().await.unwrap();
2112            assert_eq!(sender, peer1.public_key());
2113        });
2114    }
2115
2116    #[test]
2117    fn test_broadcast_slow_peer_no_blocking() {
2118        let executor = deterministic::Runner::default();
2119        executor.start(|context| async move {
2120            let cfg = RouterConfig {
2121                mailbox_size: NZUsize!(1),
2122            };
2123            let (router, mailbox, messenger) =
2124                RouterActor::<_, ed25519::PublicKey>::new(context.child("router"), cfg);
2125
2126            let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2127            let _handle = router.start(channels);
2128
2129            let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2130            let (slow_relay, _slow_receivers) =
2131                Relay::new(context.child("slow_relay"), NZUsize!(10));
2132            assert!(
2133                mailbox.ready(slow_peer, slow_relay).await.is_some(),
2134                "Failed to register slow peer"
2135            );
2136
2137            let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2138            let (fast_relay, mut fast_receivers) =
2139                Relay::new(context.child("fast_relay"), NZUsize!(100));
2140            assert!(
2141                mailbox.ready(fast_peer, fast_relay).await.is_some(),
2142                "Failed to register fast peer"
2143            );
2144
2145            let message = IoBuf::from(vec![0u8; 100]);
2146            for i in 0..11 {
2147                let sent = messenger.content(Recipients::All, 0, message.clone().into(), false);
2148                assert_ne!(
2149                    sent,
2150                    Unreliable::new(Feedback::Closed),
2151                    "Broadcast {i} should be accepted"
2152                );
2153
2154                assert!(fast_receivers.low.recv().await.is_some());
2155            }
2156        });
2157    }
2158}