Skip to main content

commonware_p2p/authenticated/discovery/
mod.rs

1//! Communicate with a fixed set of authenticated peers without known addresses over encrypted connections.
2//!
3//! `discovery` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//!
14//! # Design
15//!
16//! ## Discovery
17//!
18//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
19//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
20//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
21//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
22//! dialable addresses [std::net::SocketAddr] for the peers in that set.
23//! Registration happens via [`Manager::track`](crate::Manager::track), which accepts either a list of
24//! primary peers or a [`TrackedPeers`](crate::TrackedPeers) value containing both primary and
25//! secondary peers.
26//! The `BitVec` message contains:
27//! - `index`: The `u64` index the bit vector applies to.
28//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
29//!   peer's address in the sorted list for that index.
30//!
31//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
32//! peer a (configurable) number of times, they will assume that information is stale and mark it as
33//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
34//! information about other peers.
35//!
36//! _Warning: If peers are not synchronized on the peer set composition at a given index,
37//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
38//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
39//! application layer is responsible for ensuring peer set synchronization._
40//!
41//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
42//! `gossip_bit_vec_frequency` in the [Config]) between connected peers. This serves as both a
43//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
44//! connection, especially during periods of low application-level traffic. The protocol supports
45//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
46//! `index`. This is useful, for instance, during transitions like distributed key generation
47//! (DKG) where connections to both old and new peer sets are needed simultaneously.
48//! Secondary peers remain visible in [`PeerSetUpdate`](crate::PeerSetUpdate) notifications and can
49//! use established transport connections, including discovery gossip once connected, but outbound
50//! dialing and the gossip bit-vector namespace are restricted to primary peers.
51//!
52//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
53//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
54//! selects a random subset of these known `Info` structures (up to `peer_gossip_max_count`)
55//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
56//! `Info` for peers that they currently have a connection with. This prevents them from
57//! repeatedly sending `Info` that they cannot verify is still valid. Each `Info` contains:
58//! - `socket`: The [std::net::SocketAddr] of the peer.
59//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
60//! - `public_key`: The peer's public key.
61//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
62//!
63//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
64//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
65//!
66//! If a peer receives a `Info` message (either directly or through gossip) containing a more
67//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
68//! `Info` is also used in future gossip messages. Each peer generates its own signed
69//! `Info` upon startup and sends it immediately after establishing a connection (following
70//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
71//! record, it will be corrected promptly by the peer being dialed.
72//!
73//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
74//! [Config]) - known peer public keys and their corresponding socket addresses. The peer
75//! attempts to dial these bootstrappers, performs the handshake, sends its own `Info`, and
76//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
77//! marked as '1'). It then waits for responses, learning about other peers through the
78//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
79//! them are maintained even if they aren't part of any tracked peer sets. Different
80//! peers can have different bootstrapper lists.
81//!
82//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
83//! belongs to a tracked peer set, the listener will accept the connection, even if the
84//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
85//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
86//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
87//! with more up-to-date peer set information to connect and propagate that information, enabling
88//! the listener to potentially learn about newer sets it is part of._
89//!
90//! ## Messages
91//!
92//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
93//! internal `Data` structure. This structure contains:
94//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
95//! - `message`: The arbitrary application payload as `IoBuf`.
96//!
97//! The size of the `message` bytes must not exceed the configured
98//! `max_message_size`. If it does, the sending operation will panic. Messages can be sent with `priority`, allowing certain
99//! communications to potentially bypass lower-priority messages waiting in send queues across all
100//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
101//! and rate limiting.
102//!
103//! ## Compression
104//!
105//! Stream compression is not provided at the transport layer to avoid inadvertently
106//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
107//! between compression and encryption by analyzing patterns in the resulting data.
108//! By compressing secrets alongside attacker-controlled content, these attacks can infer
109//! sensitive information through compression ratio analysis. Applications that choose
110//! to compress data should do so with full awareness of these risks and implement
111//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
112//! alongside sensitive information).
113//!
114//! ## Batching
115//!
116//! Applications seeking higher performance should prefer batching messages
117//! above `p2p`. Larger application-level batches amortize per-message
118//! encryption overhead and, if the application also compresses its payloads,
119//! can improve compression ratio.
120//!
121//! ## Rate Limiting
122//!
123//! There are five primary rate limits:
124//!
125//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
126//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
127//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
128//! - `peer_connection_cooldown`: The per-peer rate limit for inbound and outbound connection reservations, expressed as a minimum cooldown between attempts.
129//! - `rate` (per channel): The rate limit for messages sent on a single channel.
130//!
131//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
132//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
133//!
134//! ## IP Poisoning
135//!
136//! A malicious peer can claim an ingress [std::net::SocketAddr] that collides with an honest peer, drawing invalid dial attempts to
137//! the honest peer (where we expect the malicious public key rather than the honest public key).
138//!
139//! Because we rate limit inbound connection attempts per IP/subnet, this poisoning can lead to us dropping legitimate
140//! dial attempts (if quota was already exhausted on useless dial attempts). Recall, an honest dialer doesn't know which public
141//! key actually resides at an address and must try all that collide.
142//!
143//! To mitigate this issue, we shuffle peer dial order on each dial queue refresh. This ensures we eventually dial a poisoned
144//! IP with the correct public key before hitting the rate limit imposed by the listener at said IP.
145//!
146//! _If you want to entirely prevent this class of attack, consider migrating to [crate::authenticated::lookup]._
147//!
148//! ## Message Delivery
149//!
150//! Outgoing message submissions can be rejected when a peer's send buffer is full, preventing slow
151//! peers from blocking sends to other peers. Incoming messages are dropped when the application's
152//! receive buffer is full, ensuring protocol messages (BitVec, Peers) continue to flow and
153//! connections remain healthy.
154//!
155//! # Example
156//!
157//! ```rust
158//! use commonware_p2p::{authenticated::discovery::{self, Network}, Ingress, Manager, Sender, Recipients};
159//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, };
160//! use commonware_runtime::{deterministic, IoBuf, Metrics, Quota, Runner, Spawner, Supervisor};
161//! use commonware_utils::{ordered::Set, NZU32};
162//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
163//!
164//! // Configure context
165//! let runtime_cfg = deterministic::Config::default();
166//! let runner = deterministic::Runner::new(runtime_cfg);
167//!
168//! // Generate identity
169//! //
170//! // In production, the signer should be generated from a secure source of entropy.
171//! let signer = ed25519::PrivateKey::from_seed(0);
172//!
173//! // Generate peers
174//! //
175//! // In production, peer identities will be provided by some external source of truth
176//! // (like the staking set of a blockchain).
177//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
178//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
179//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
180//!
181//! // Configure bootstrappers
182//! //
183//! // In production, it is likely that the address of bootstrappers will be some public address.
184//! let bootstrapper_addr: Ingress = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001).into();
185//! let bootstrappers = vec![(peer1.clone(), bootstrapper_addr)];
186//!
187//! // Configure namespace
188//! //
189//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
190//! let application_namespace = b"my-app-namespace";
191//!
192//! // Configure network
193//! //
194//! // In production, use a more conservative configuration like `Config::recommended`.
195//! const MAX_MESSAGE_SIZE: u32 = 1_024; // 1KB
196//! let p2p_cfg = discovery::Config::local(
197//!     signer.clone(),
198//!     application_namespace,
199//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
200//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
201//!     bootstrappers,
202//!     MAX_MESSAGE_SIZE,
203//! );
204//!
205//! // Start context
206//! runner.start(|context| async move {
207//!     // Initialize network
208//!     let (mut network, mut oracle) = Network::new(context.child("network"), p2p_cfg);
209//!
210//!     // Register authorized peers
211//!     //
212//!     // In production, this would be updated as new peer sets are created (like when
213//!     // the composition of a validator set changes).
214//!     oracle.track(
215//!         0,
216//!         Set::try_from([signer.public_key(), peer1, peer2, peer3]).unwrap(),
217//!     );
218//!
219//!     // Register some channel
220//!     const MAX_MESSAGE_BACKLOG: usize = 128;
221//!     let (mut sender, receiver) = network.register(
222//!         0,
223//!         Quota::per_second(NZU32!(1)),
224//!         MAX_MESSAGE_BACKLOG,
225//!     );
226//!
227//!     // Run network
228//!     network.start();
229//!
230//!     // Example: Use sender
231//!     let _ = sender.send(Recipients::All, IoBuf::from(b"hello"), false);
232//!
233//!     // Graceful shutdown (stops all spawned tasks)
234//!     context.stop(0, None).await.unwrap();
235//! });
236//! ```
237
238mod actors;
239mod channels;
240mod config;
241mod metrics;
242mod network;
243mod types;
244
245use thiserror::Error;
246
247/// Errors that can occur when interacting with the network.
248#[derive(Error, Debug)]
249pub enum Error {
250    #[error("network closed")]
251    NetworkClosed,
252}
253
254pub use actors::tracker::Oracle;
255pub use channels::{Receiver, Sender};
256pub use config::{Bootstrapper, Config};
257pub use network::Network;
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use crate::{
263        authenticated::{
264            discovery::actors::router::{Actor as RouterActor, Config as RouterConfig},
265            relay::Relay,
266        },
267        CheckedSender as _, Ingress, LimitedSender as _, Manager, Provider, Receiver, Recipients,
268        Sender,
269    };
270    use commonware_actor::{Feedback, Unreliable};
271    use commonware_cryptography::{ed25519, Signer as _};
272    use commonware_macros::{select, select_loop, test_group, test_traced};
273    use commonware_runtime::{
274        deterministic, telemetry::metrics::count_running_tasks, tokio, BufferPooler, Clock, Handle,
275        IoBuf, Metrics, Network as RNetwork, Quota, Resolver, Runner, Spawner, Supervisor as _,
276    };
277    use commonware_utils::{channel::mpsc, hostname, ordered::Set, NZUsize, TryCollect, NZU32};
278    use rand_core::{CryptoRngCore, RngCore};
279    use std::{
280        collections::HashSet,
281        net::{IpAddr, Ipv4Addr, SocketAddr},
282        time::Duration,
283    };
284
285    #[derive(Copy, Clone)]
286    enum Mode {
287        All,
288        Some,
289        One,
290    }
291
292    const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; // 1MB
293    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
294
295    /// Ensure no message rate limiting occurred.
296    ///
297    /// If a message is rate limited, it would be formatted as:
298    ///
299    /// ```text
300    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
301    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
302    /// ```
303    fn assert_no_rate_limiting(metrics: &str) {
304        assert!(
305            !metrics.contains("messages_rate_limited_total{"),
306            "no messages should be rate limited: {metrics}"
307        );
308    }
309
310    /// Test connectivity between `n` peers.
311    ///
312    /// We set a unique `base_port` for each test to avoid "address already in use"
313    /// errors when tests are run immediately after each other.
314    async fn run_network(
315        context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
316        max_message_size: u32,
317        base_port: u16,
318        n: usize,
319        mode: Mode,
320    ) {
321        // Create peers
322        let mut peers = Vec::new();
323        for i in 0..n {
324            peers.push(ed25519::PrivateKey::from_seed(i as u64));
325        }
326        let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
327
328        // Create networks
329        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
330        for (i, peer) in peers.iter().enumerate() {
331            // Create peer context
332            let context = context.child("peer").with_attribute("index", i);
333
334            // Derive port
335            let port = base_port + i as u16;
336
337            // Create bootstrappers
338            let mut bootstrappers = Vec::new();
339            if i > 0 {
340                bootstrappers.push((
341                    addresses[0].clone(),
342                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
343                ));
344            }
345
346            // Create network
347            let signer = peer.clone();
348            let config = Config::test(
349                signer.clone(),
350                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
351                bootstrappers,
352                max_message_size,
353            );
354            let (mut network, mut oracle) = Network::new(context.child("network"), config);
355
356            // Register peers
357            oracle.track(0, Set::try_from(addresses.clone()).unwrap());
358
359            // Register basic application
360            let (mut sender, mut receiver) =
361                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
362
363            // Wait to connect to all peers, and then send messages to everyone
364            network.start();
365
366            // Send/Receive messages
367            context.child("agent").spawn({
368                let complete_sender = complete_sender.clone();
369                let addresses = addresses.clone();
370                move |context| async move {
371                    // Wait for all peers to send their identity
372                    let receiver = context.child("receiver").spawn(move |_| async move {
373                        // Wait for all peers to send their identity
374                        let mut received = HashSet::new();
375                        while received.len() < n - 1 {
376                            // Ensure message equals sender identity
377                            let (sender, message) = receiver.recv().await.unwrap();
378                            assert_eq!(message, sender.as_ref());
379
380                            // Add to received set
381                            received.insert(sender);
382                        }
383                        complete_sender.send(()).await.unwrap();
384
385                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
386                        loop {
387                            receiver.recv().await.unwrap();
388                        }
389                    });
390
391                    // Send identity to all peers
392                    let msg = signer.public_key();
393                    let sender = context.child("sender").spawn(move |context| async move {
394                        // Get all peers not including self
395                        let mut recipients = addresses.clone();
396                        recipients.remove(i);
397                        recipients.sort();
398
399                        // Loop forever to account for unexpected message drops
400                        loop {
401                            match mode {
402                                Mode::One => {
403                                    for recipient in &recipients {
404                                        // Loop until success
405                                        loop {
406                                            let sent = sender.send(
407                                                Recipients::One(recipient.clone()),
408                                                msg.as_ref().to_vec(),
409                                                true,
410                                            );
411                                            if sent.len() != 1 {
412                                                context.sleep(Duration::from_millis(100)).await;
413                                                continue;
414                                            }
415                                            break;
416                                        }
417                                    }
418                                }
419                                Mode::Some | Mode::All => {
420                                    // Loop until all peer sends successful
421                                    loop {
422                                        let sent = sender.send(
423                                            match mode {
424                                                Mode::Some => Recipients::Some(recipients.clone()),
425                                                Mode::All => Recipients::All,
426                                                _ => unreachable!(),
427                                            },
428                                            msg.as_ref().to_vec(),
429                                            true,
430                                        );
431                                        if sent.len() != recipients.len() {
432                                            context.sleep(Duration::from_millis(100)).await;
433                                            continue;
434                                        }
435                                        break;
436                                    }
437                                }
438                            };
439
440                            // Sleep to avoid busy loop
441                            context.sleep(Duration::from_secs(10)).await;
442                        }
443                    });
444
445                    // Neither task should exit
446                    select! {
447                        receiver = receiver => {
448                            panic!("receiver exited: {receiver:?}");
449                        },
450                        sender = sender => {
451                            panic!("sender exited: {sender:?}");
452                        },
453                    }
454                }
455            });
456        }
457
458        // Wait for all peers to finish
459        for _ in 0..n {
460            complete_receiver.recv().await.unwrap();
461        }
462
463        // Ensure no message rate limiting occurred
464        assert_no_rate_limiting(&context.encode());
465    }
466
467    fn run_deterministic_test(seed: u64, mode: Mode) {
468        // Configure test
469        const NUM_PEERS: usize = 25;
470        const BASE_PORT: u16 = 3000;
471
472        // Run first instance
473        let executor = deterministic::Runner::seeded(seed);
474        let state = executor.start(|context| async move {
475            run_network(
476                context.child("network"),
477                MAX_MESSAGE_SIZE,
478                BASE_PORT,
479                NUM_PEERS,
480                mode,
481            )
482            .await;
483            context.auditor().state()
484        });
485
486        // Compare result to second instance
487        let executor = deterministic::Runner::seeded(seed);
488        let state2 = executor.start(|context| async move {
489            run_network(
490                context.child("network"),
491                MAX_MESSAGE_SIZE,
492                BASE_PORT,
493                NUM_PEERS,
494                mode,
495            )
496            .await;
497            context.auditor().state()
498        });
499        assert_eq!(state, state2);
500    }
501
502    #[test_group("slow")]
503    #[test_traced]
504    fn test_determinism_one() {
505        for i in 0..10 {
506            run_deterministic_test(i, Mode::One);
507        }
508    }
509
510    #[test_group("slow")]
511    #[test_traced]
512    fn test_determinism_some() {
513        for i in 0..10 {
514            run_deterministic_test(i, Mode::Some);
515        }
516    }
517
518    #[test_group("slow")]
519    #[test_traced]
520    fn test_determinism_all() {
521        for i in 0..10 {
522            run_deterministic_test(i, Mode::All);
523        }
524    }
525
526    #[test_traced]
527    fn test_tokio_connectivity() {
528        let executor = tokio::Runner::default();
529        executor.start(|context| async move {
530            let base_port = 3000;
531            let n = 10;
532            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
533        });
534    }
535
536    #[test_traced]
537    fn test_multi_index_oracle() {
538        // Configure test
539        let base_port = 3000;
540        let n: usize = 100;
541
542        // Initialize context
543        let executor = deterministic::Runner::default();
544        executor.start(|context| async move {
545            // Create peers
546            let mut peers = Vec::new();
547            for i in 0..n {
548                peers.push(ed25519::PrivateKey::from_seed(i as u64));
549            }
550            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
551
552            // Create networks
553            let mut waiters = Vec::new();
554            for (i, peer) in peers.iter().enumerate() {
555                // Create peer context
556                let context = context.child("peer").with_attribute("index", i);
557
558                // Derive port
559                let port = base_port + i as u16;
560
561                // Create bootstrappers
562                let mut bootstrappers = Vec::new();
563                if i > 0 {
564                    bootstrappers.push((
565                        addresses[0].clone(),
566                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
567                    ));
568                }
569
570                // Create network
571                let signer = peer.clone();
572                let config = Config::test(
573                    signer.clone(),
574                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
575                    bootstrappers,
576                    1_024 * 1_024, // 1MB
577                );
578                let (mut network, mut oracle) = Network::new(context.child("network"), config);
579
580                // Register peers at separate indices
581                oracle.track(0, Set::try_from([addresses[0].clone()]).unwrap());
582                oracle.track(
583                    1,
584                    Set::try_from([addresses[1].clone(), addresses[2].clone()]).unwrap(),
585                );
586                oracle.track(
587                    2,
588                    addresses
589                        .iter()
590                        .skip(2)
591                        .cloned()
592                        .try_collect::<Set<_>>()
593                        .unwrap(),
594                );
595
596                // Register basic application
597                let (mut sender, mut receiver) =
598                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
599
600                // Wait to connect to all peers, and then send messages to everyone
601                network.start();
602
603                // Send/Receive messages
604                let handler = context.child("agent").spawn(move |context| async move {
605                    if i == 0 {
606                        // Loop until success
607                        let msg = signer.public_key();
608                        loop {
609                            if sender
610                                .send(Recipients::All, msg.as_ref().to_vec(), true)
611                                .len()
612                                == n - 1
613                            {
614                                break;
615                            }
616
617                            // Sleep and try again (avoid busy loop)
618                            context.sleep(Duration::from_millis(100)).await;
619                        }
620                    } else {
621                        // Ensure message equals sender identity
622                        let (sender, message) = receiver.recv().await.unwrap();
623                        assert_eq!(message, sender.as_ref());
624                    }
625                });
626
627                // Add to waiters
628                waiters.push(handler);
629            }
630
631            // Wait for waiters to finish (receiver before sender)
632            for waiter in waiters.into_iter().rev() {
633                waiter.await.unwrap();
634            }
635
636            // Ensure no message rate limiting occurred
637            assert_no_rate_limiting(&context.encode());
638        });
639    }
640
641    #[test_traced]
642    #[should_panic(expected = "message too large")]
643    fn test_message_too_large() {
644        // Configure test
645        let base_port = 3000;
646        let n: usize = 2;
647
648        // Initialize context
649        let executor = deterministic::Runner::seeded(0);
650        executor.start(|mut context| async move {
651            // Create peers
652            let mut peers = Vec::new();
653            for i in 0..n {
654                peers.push(ed25519::PrivateKey::from_seed(i as u64));
655            }
656            let addresses: Set<_> = peers.iter().map(|p| p.public_key()).try_collect().unwrap();
657
658            // Create network
659            let signer = peers[0].clone();
660            let config = Config::test(
661                signer,
662                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
663                Vec::new(),
664                1_024 * 1_024, // 1MB
665            );
666            let (mut network, mut oracle) = Network::new(context.child("network"), config);
667
668            // Register peers
669            oracle.track(0, addresses.clone());
670
671            // Register basic application
672            let (mut sender, _) =
673                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
674
675            // Wait to connect to all peers, and then send messages to everyone
676            network.start();
677
678            // Crate random message
679            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
680            context.fill_bytes(&mut msg[..]);
681
682            // Send message
683            let recipient = Recipients::One(addresses[1].clone());
684            sender.send(recipient, msg, true);
685        });
686    }
687
688    #[test_traced]
689    fn test_rate_limiting() {
690        // Configure test
691        let base_port = 3000;
692        let n: usize = 2;
693
694        // Initialize context
695        let executor = deterministic::Runner::seeded(0);
696        executor.start(|context| async move {
697            // Create peers
698            let mut peers = Vec::new();
699            for i in 0..n {
700                peers.push(ed25519::PrivateKey::from_seed(i as u64));
701            }
702            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
703            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
704            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
705
706            // Create network for peer 0
707            let signer0 = peers[0].clone();
708            let config0 = Config::test(
709                signer0.clone(),
710                socket0,
711                vec![(peers[1].public_key(), socket1.into())],
712                1_024 * 1_024, // 1MB
713            );
714            let (mut network0, mut oracle0) =
715                Network::new(context.child("peer").with_attribute("index", 0), config0);
716            oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
717            let (mut sender0, _receiver0) =
718                network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
719            network0.start();
720
721            // Create network for peer 1
722            let signer1 = peers[1].clone();
723            let config1 = Config::test(
724                signer1.clone(),
725                socket1,
726                vec![(peers[0].public_key(), socket0.into())],
727                1_024 * 1_024, // 1MB
728            );
729            let (mut network1, mut oracle1) =
730                Network::new(context.child("peer").with_attribute("index", 1), config1);
731            oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
732            let (_sender1, _receiver1) =
733                network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
734            network1.start();
735
736            // Send first message, which should be allowed and consume the quota.
737            let msg = vec![0u8; 1024]; // 1KB
738            loop {
739                // Confirm message is sent to peer
740                let checked = sender0.check(Recipients::All).unwrap();
741                if !checked.recipients().is_empty() {
742                    checked.send(msg.clone(), true);
743                    break;
744                }
745
746                // Ensure we don't rate limit outbound sends while
747                // waiting for peers to connect
748                context.sleep(Duration::from_mins(1)).await
749            }
750
751            // Immediately send the second message to trigger the rate limit.
752            let sent = sender0.send(Recipients::One(addresses[1].clone()), msg, true);
753            assert!(sent.is_empty());
754
755            // Give the metrics time to reflect the rate-limited message.
756            for _ in 0..10 {
757                assert_no_rate_limiting(&context.encode());
758                context.sleep(Duration::from_millis(100)).await;
759            }
760        });
761    }
762
763    #[test_traced]
764    fn test_unordered_peer_sets() {
765        let (n, base_port) = (10, 3000);
766        let executor = deterministic::Runner::default();
767        executor.start(|context| async move {
768            // Create peers
769            let mut peers_and_sks = Vec::new();
770            for i in 0..n {
771                let sk = ed25519::PrivateKey::from_seed(i as u64);
772                let pk = sk.public_key();
773                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
774                peers_and_sks.push((sk, pk, addr));
775            }
776            let peer0 = peers_and_sks[0].clone();
777            let config = Config::test(
778                peer0.0,
779                peer0.2,
780                vec![(peer0.1.clone(), peer0.2.into())],
781                1_024 * 1_024,
782            );
783            let (network, mut oracle) = Network::new(context.child("network"), config);
784            network.start();
785
786            // Subscribe to peer sets
787            let mut subscription = oracle.subscribe().await;
788
789            // Register initial peer set
790            let set10: Set<_> = peers_and_sks
791                .iter()
792                .take(2)
793                .map(|(_, pk, _)| pk.clone())
794                .try_collect()
795                .unwrap();
796            oracle.track(10, set10.clone());
797            let update = subscription.recv().await.unwrap();
798            assert_eq!(update.index, 10);
799            assert_eq!(update.latest.primary, set10);
800            assert!(update.latest.secondary.is_empty());
801            assert_eq!(update.all.primary, set10);
802            assert!(update.all.secondary.is_empty());
803
804            // Register old peer sets (ignored)
805            let set9: Set<_> = peers_and_sks
806                .iter()
807                .skip(2)
808                .map(|(_, pk, _)| pk.clone())
809                .try_collect()
810                .unwrap();
811            oracle.track(9, set9.clone());
812
813            // Add new peer set
814            let set11: Set<_> = peers_and_sks
815                .iter()
816                .skip(4)
817                .map(|(_, pk, _)| pk.clone())
818                .try_collect()
819                .unwrap();
820            oracle.track(11, set11.clone());
821            let update = subscription.recv().await.unwrap();
822            assert_eq!(update.index, 11);
823            assert_eq!(update.latest.primary, set11);
824            assert!(update.latest.secondary.is_empty());
825            let all_keys: Set<_> = set10
826                .into_iter()
827                .chain(set11.into_iter())
828                .try_collect()
829                .unwrap();
830            assert_eq!(update.all.primary, all_keys);
831            assert!(update.all.secondary.is_empty());
832        });
833    }
834
835    #[test_traced]
836    fn test_graceful_shutdown() {
837        let base_port = 3000;
838        let n: usize = 5;
839
840        let executor = deterministic::Runner::default();
841        executor.start(|context| async move {
842            // Create peers
843            let mut peers = Vec::new();
844            for i in 0..n {
845                peers.push(ed25519::PrivateKey::from_seed(i as u64));
846            }
847            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
848
849            // Create networks for all peers
850            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
851            for (i, peer) in peers.iter().enumerate() {
852                let peer_context = context.child("peer").with_attribute("index", i);
853                let port = base_port + i as u16;
854
855                // Create bootstrappers (everyone connects to peer 0)
856                let mut bootstrappers = Vec::new();
857                if i > 0 {
858                    bootstrappers.push((
859                        addresses[0].clone(),
860                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
861                    ));
862                }
863
864                let signer = peer.clone();
865                let config = Config::test(
866                    signer.clone(),
867                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
868                    bootstrappers,
869                    1_024 * 1_024, // 1MB
870                );
871                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
872
873                // Register peer set
874                oracle.track(0, Set::try_from(addresses.clone()).unwrap());
875
876                let (mut sender, mut receiver) =
877                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
878                network.start();
879
880                peer_context.child("agent").spawn({
881                    let complete_sender = complete_sender.clone();
882                    move |context| async move {
883                        // Wait to connect to at least one other peer (except for peer 0 which is the bootstrapper)
884                        let expected_connections = if i == 0 { n - 1 } else { 1 };
885
886                        // Send a message
887                        let msg = signer.public_key();
888                        loop {
889                            let sent = sender.send(Recipients::All, msg.as_ref().to_vec(), true);
890                            if sent.len() >= expected_connections {
891                                break;
892                            }
893                            context.sleep(Duration::from_millis(100)).await;
894                        }
895
896                        // Signal that this peer is connected
897                        complete_sender.send(()).await.unwrap();
898
899                        // Keep receiving messages until shutdown
900                        select_loop! {
901                            context,
902                            on_stopped => {},
903                            Ok(_) = receiver.recv() else break => {},
904                        }
905                    }
906                });
907            }
908
909            // Wait for all peers to establish connectivity
910            for _ in 0..n {
911                complete_receiver.recv().await.unwrap();
912            }
913
914            // Verify that network actors started for all peers
915            let metrics_before = context.encode();
916            let tasks_running = |metrics: &str, name: &str| -> Option<u64> {
917                metrics.lines().find_map(|line| {
918                    (line.starts_with("runtime_tasks_running{")
919                        && line.contains(&format!("name=\"{name}\""))
920                        && line.contains("kind=\"Task\""))
921                    .then(|| {
922                        line.split_whitespace()
923                            .next_back()
924                            .expect("metric line must have a value")
925                            .parse::<u64>()
926                            .expect("running task count must be an integer")
927                    })
928                })
929            };
930
931            for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
932                let name = format!("peer_network_{actor}");
933                assert_eq!(
934                    tasks_running(&metrics_before, &name),
935                    Some(n as u64),
936                    "{name} should have {n} running tasks before shutdown"
937                );
938            }
939
940            // All peers are connected - now trigger graceful shutdown
941            // by stopping the context
942            context.child("shutdown").spawn(move |context| async move {
943                // Trigger graceful shutdown
944                let result = context.stop(0, Some(Duration::from_secs(5))).await;
945
946                // Shutdown should complete successfully without timeout
947                assert!(
948                    result.is_ok(),
949                    "graceful shutdown should complete: {result:?}"
950                );
951            });
952
953            // Wait for shutdown to complete
954            context.stopped().await.unwrap();
955
956            // Give the runtime a tick to process task completions and update metrics
957            context.sleep(Duration::from_millis(100)).await;
958
959            // Verify that all network actors stopped
960            let metrics_after = context.encode();
961            for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
962                let name = format!("peer_network_{actor}");
963                assert_eq!(
964                    tasks_running(&metrics_after, &name),
965                    Some(0),
966                    "{name} should be stopped"
967                );
968            }
969        });
970    }
971
972    #[test_traced]
973    fn test_subscription_includes_self_when_registered() {
974        let base_port = 3000;
975        let executor = deterministic::Runner::default();
976        executor.start(|context| async move {
977            // Create self (peer0) and other peers
978            let self_sk = ed25519::PrivateKey::from_seed(0);
979            let self_pk = self_sk.public_key();
980            let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
981
982            let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
983
984            // Create network for peer0 (self)
985            let config = Config::test(
986                self_sk,
987                self_addr,
988                vec![], // No bootstrappers
989                1_024 * 1_024,
990            );
991            let (network, mut oracle) = Network::new(context.child("network"), config);
992            network.start();
993
994            // Subscribe to peer sets
995            let mut subscription = oracle.subscribe().await;
996
997            // Register a peer set that does NOT include self
998            let peer_set: Set<_> = [other_pk.clone()].try_into().unwrap();
999            oracle.track(1, peer_set.clone());
1000
1001            // Receive subscription notification
1002            let update = subscription.recv().await.unwrap();
1003            assert_eq!(update.index, 1);
1004            assert_eq!(update.latest.primary.len(), 1);
1005            assert!(update.latest.secondary.is_empty());
1006            assert_eq!(update.all.primary.len(), 1);
1007            assert!(update.all.secondary.is_empty());
1008
1009            // Self should NOT be in the latest set
1010            assert!(
1011                update.latest.primary.position(&self_pk).is_none(),
1012                "latest set should not include self"
1013            );
1014            assert!(
1015                update.latest.primary.position(&other_pk).is_some(),
1016                "latest set should include other"
1017            );
1018
1019            // Self should NOT be in the peer set (not tracked)
1020            assert!(
1021                update.all.primary.position(&self_pk).is_none(),
1022                "peer set should not include self"
1023            );
1024            assert!(
1025                update.all.primary.position(&other_pk).is_some(),
1026                "peer set should include other"
1027            );
1028
1029            // Now register a peer set that DOES include self
1030            let peer_set: Set<_> = [self_pk.clone(), other_pk.clone()].try_into().unwrap();
1031            oracle.track(2, peer_set.clone());
1032
1033            // Receive subscription notification
1034            let update = subscription.recv().await.unwrap();
1035            assert_eq!(update.index, 2);
1036            assert_eq!(update.latest.primary.len(), 2);
1037            assert!(update.latest.secondary.is_empty());
1038            assert_eq!(update.all.primary.len(), 2);
1039            assert!(update.all.secondary.is_empty());
1040
1041            // Both peers should be in the latest set
1042            assert!(
1043                update.latest.primary.position(&self_pk).is_some(),
1044                "latest set should include self"
1045            );
1046            assert!(
1047                update.latest.primary.position(&other_pk).is_some(),
1048                "latest set should include other"
1049            );
1050
1051            // Both peers should be in the peer set
1052            assert!(
1053                update.all.primary.position(&self_pk).is_some(),
1054                "peer set should include self"
1055            );
1056            assert!(
1057                update.all.primary.position(&other_pk).is_some(),
1058                "peer set should include other"
1059            );
1060        });
1061    }
1062
1063    #[test_traced]
1064    fn test_dns_bootstrapper_resolution() {
1065        let base_port = 3000;
1066        let n: usize = 3;
1067
1068        let executor = deterministic::Runner::default();
1069        executor.start(|context| async move {
1070            // Create peers
1071            let mut peers = Vec::new();
1072            for i in 0..n {
1073                peers.push(ed25519::PrivateKey::from_seed(i as u64));
1074            }
1075            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1076
1077            // Register DNS mapping for the bootstrapper (peer 0)
1078            let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1079            context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1080
1081            // Create networks
1082            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1083            for (i, peer) in peers.iter().enumerate() {
1084                let context = context.child("peer").with_attribute("index", i);
1085                let port = base_port + i as u16;
1086
1087                // Create bootstrappers - use DNS for non-zero peers
1088                let bootstrappers = if i > 0 {
1089                    vec![(
1090                        addresses[0].clone(),
1091                        Ingress::Dns {
1092                            host: hostname!("boot.local"),
1093                            port: base_port,
1094                        },
1095                    )]
1096                } else {
1097                    vec![]
1098                };
1099
1100                // Create network
1101                let signer = peer.clone();
1102                let config = Config::test(
1103                    signer.clone(),
1104                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1105                    bootstrappers,
1106                    1_024 * 1_024,
1107                );
1108                let (mut network, mut oracle) = Network::new(context.child("network"), config);
1109
1110                // Register peers
1111                oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1112
1113                // Register channel
1114                let (mut sender, mut receiver) =
1115                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1116
1117                network.start();
1118
1119                // Send/Receive messages
1120                context.child("agent").spawn({
1121                    let complete_sender = complete_sender.clone();
1122                    let addresses = addresses.clone();
1123                    move |context| async move {
1124                        // Wait for messages from other peers
1125                        let receiver = context.child("receiver").spawn(move |_| async move {
1126                            let mut received = HashSet::new();
1127                            while received.len() < n - 1 {
1128                                let (sender, message) = receiver.recv().await.unwrap();
1129                                assert_eq!(message, sender.as_ref());
1130                                received.insert(sender);
1131                            }
1132                            complete_sender.send(()).await.unwrap();
1133
1134                            loop {
1135                                receiver.recv().await.unwrap();
1136                            }
1137                        });
1138
1139                        // Send identity to all peers
1140                        let msg = signer.public_key();
1141                        let sender = context.child("sender").spawn(move |context| async move {
1142                            loop {
1143                                let mut recipients = addresses.clone();
1144                                recipients.remove(i);
1145                                recipients.sort();
1146
1147                                loop {
1148                                    let mut sent =
1149                                        sender.send(Recipients::All, msg.as_ref().to_vec(), true);
1150                                    if sent.len() != recipients.len() {
1151                                        context.sleep(Duration::from_millis(100)).await;
1152                                        continue;
1153                                    }
1154                                    sent.sort();
1155                                    assert_eq!(sent, recipients);
1156                                    break;
1157                                }
1158
1159                                context.sleep(Duration::from_secs(10)).await;
1160                            }
1161                        });
1162
1163                        select! {
1164                            receiver = receiver => {
1165                                panic!("receiver exited: {receiver:?}")
1166                            },
1167                            sender = sender => {
1168                                panic!("sender exited: {sender:?}")
1169                            },
1170                        }
1171                    }
1172                });
1173            }
1174
1175            // Wait for all peers to exchange messages
1176            for _ in 0..n {
1177                complete_receiver.recv().await.unwrap();
1178            }
1179
1180            assert_no_rate_limiting(&context.encode());
1181        });
1182    }
1183
1184    #[test_traced]
1185    fn test_dns_resolution_failure_then_success() {
1186        let base_port = 3100;
1187
1188        let executor = deterministic::Runner::default();
1189        executor.start(|context| async move {
1190            // Create 2 peers
1191            let peer0 = ed25519::PrivateKey::from_seed(0);
1192            let peer1 = ed25519::PrivateKey::from_seed(1);
1193            let addresses = vec![peer0.public_key(), peer1.public_key()];
1194
1195            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1196            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1197
1198            // Do NOT register DNS mapping initially - peer 1 will fail to resolve
1199
1200            // Create network for peer 0 (bootstrapper, no DNS)
1201            let config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1202            let (mut network0, mut oracle0) =
1203                Network::new(context.child("peer").with_attribute("index", 0), config0);
1204            oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
1205            let (mut sender0, mut receiver0) =
1206                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1207            network0.start();
1208
1209            // Create network for peer 1 with DNS bootstrapper
1210            let config1 = Config::test(
1211                peer1.clone(),
1212                socket1,
1213                vec![(
1214                    peer0.public_key(),
1215                    Ingress::Dns {
1216                        host: hostname!("boot.local"),
1217                        port: base_port,
1218                    },
1219                )],
1220                1_024 * 1_024,
1221            );
1222            let (mut network1, mut oracle1) =
1223                Network::new(context.child("peer").with_attribute("index", 1), config1);
1224            oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
1225            let (mut sender1, mut receiver1) =
1226                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1227            network1.start();
1228
1229            // Wait a bit - peer 1 should fail to connect (DNS not registered)
1230            context.sleep(Duration::from_secs(2)).await;
1231
1232            // Verify peer 0 cannot send to peer 1 yet
1233            let checked = sender0.check(Recipients::All).unwrap();
1234            assert!(
1235                checked.recipients().is_empty(),
1236                "should not be connected yet"
1237            );
1238
1239            // Now register the DNS mapping
1240            context.resolver_register("boot.local", Some(vec![IpAddr::V4(Ipv4Addr::LOCALHOST)]));
1241
1242            // Wait for peer 1 to connect via DNS resolution
1243            let pk0 = peer0.public_key();
1244            let pk1 = peer1.public_key();
1245            let msg0 = pk0.to_vec();
1246            let msg1 = pk1.to_vec();
1247
1248            // Spawn receiver tasks
1249            let (done_sender, mut done_receiver) = mpsc::channel::<()>(2);
1250            let done0 = done_sender.clone();
1251            let pk1_clone = pk1.clone();
1252            context
1253                .child("recv")
1254                .with_attribute("index", 0)
1255                .spawn(move |_| async move {
1256                    let (sender, message) = receiver0.recv().await.unwrap();
1257                    assert_eq!(sender, pk1_clone);
1258                    assert_eq!(message, msg1.as_slice());
1259                    done0.send(()).await.unwrap();
1260                });
1261            let done1 = done_sender.clone();
1262            let pk0_clone = pk0.clone();
1263            context
1264                .child("recv")
1265                .with_attribute("index", 1)
1266                .spawn(move |_| async move {
1267                    let (sender, message) = receiver1.recv().await.unwrap();
1268                    assert_eq!(sender, pk0_clone);
1269                    assert_eq!(message, msg0.as_slice());
1270                    done1.send(()).await.unwrap();
1271                });
1272
1273            let mut received = 0;
1274            while received < 2 {
1275                let sent0 = sender0.send(Recipients::One(pk1.clone()), pk0.as_ref().to_vec(), true);
1276                let sent1 = sender1.send(Recipients::One(pk0.clone()), pk1.as_ref().to_vec(), true);
1277                assert!(!sent0.is_empty());
1278                assert!(!sent1.is_empty());
1279
1280                select! {
1281                    done = done_receiver.recv() => {
1282                        done.expect("receiver task stopped");
1283                        received += 1;
1284                    },
1285                    _ = context.sleep(Duration::from_millis(100)) => {},
1286                }
1287            }
1288        });
1289    }
1290
1291    /// Helper to run DNS connectivity test with a specific seed and return auditor state.
1292    fn run_dns_connectivity(seed: u64) -> String {
1293        let base_port = 3400;
1294        let n: usize = 3;
1295
1296        let executor = deterministic::Runner::seeded(seed);
1297        executor.start(|context| async move {
1298            // Create peers
1299            let mut peers = Vec::new();
1300            for i in 0..n {
1301                peers.push(ed25519::PrivateKey::from_seed(i as u64));
1302            }
1303            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1304
1305            // Register DNS mappings
1306            let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1307            context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1308
1309            // Create networks
1310            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1311            for (i, peer) in peers.iter().enumerate() {
1312                let context = context.child("peer").with_attribute("index", i);
1313                let port = base_port + i as u16;
1314
1315                // Use DNS for bootstrapper
1316                let bootstrappers = if i > 0 {
1317                    vec![(
1318                        addresses[0].clone(),
1319                        Ingress::Dns {
1320                            host: hostname!("boot.local"),
1321                            port: base_port,
1322                        },
1323                    )]
1324                } else {
1325                    vec![]
1326                };
1327
1328                let signer = peer.clone();
1329                let config = Config::test(
1330                    signer.clone(),
1331                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1332                    bootstrappers,
1333                    1_024 * 1_024,
1334                );
1335                let (mut network, mut oracle) = Network::new(context.child("network"), config);
1336                oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1337                let (mut sender, mut receiver) =
1338                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1339                network.start();
1340
1341                context.child("agent").spawn({
1342                    let complete_sender = complete_sender.clone();
1343                    let addresses = addresses.clone();
1344                    move |context| async move {
1345                        let receiver = context.child("receiver").spawn(move |_| async move {
1346                            let mut received = HashSet::new();
1347                            while received.len() < n - 1 {
1348                                let (sender, message) = receiver.recv().await.unwrap();
1349                                assert_eq!(message, sender.as_ref());
1350                                received.insert(sender);
1351                            }
1352                            complete_sender.send(()).await.unwrap();
1353                            loop {
1354                                receiver.recv().await.unwrap();
1355                            }
1356                        });
1357
1358                        let msg = signer.public_key();
1359                        let sender = context.child("sender").spawn(move |context| async move {
1360                            loop {
1361                                let mut recipients = addresses.clone();
1362                                recipients.remove(i);
1363                                recipients.sort();
1364
1365                                loop {
1366                                    let mut sent =
1367                                        sender.send(Recipients::All, msg.as_ref().to_vec(), true);
1368                                    if sent.len() != recipients.len() {
1369                                        context.sleep(Duration::from_millis(100)).await;
1370                                        continue;
1371                                    }
1372                                    sent.sort();
1373                                    assert_eq!(sent, recipients);
1374                                    break;
1375                                }
1376
1377                                context.sleep(Duration::from_secs(10)).await;
1378                            }
1379                        });
1380
1381                        select! {
1382                            receiver = receiver => {
1383                                panic!("receiver exited: {receiver:?}")
1384                            },
1385                            sender = sender => {
1386                                panic!("sender exited: {sender:?}")
1387                            },
1388                        }
1389                    }
1390                });
1391            }
1392
1393            for _ in 0..n {
1394                complete_receiver.recv().await.unwrap();
1395            }
1396
1397            context.auditor().state()
1398        })
1399    }
1400
1401    #[test_traced]
1402    fn test_dns_resolution_determinism() {
1403        // Run same test twice with same seed
1404        let state1 = run_dns_connectivity(42);
1405        let state2 = run_dns_connectivity(42);
1406        assert_eq!(state1, state2, "DNS resolution should be deterministic");
1407    }
1408
1409    #[test_traced]
1410    fn test_dns_resolving_to_private_ip_not_dialed() {
1411        // Test that when allow_private_ips=false, DNS addresses that resolve
1412        // to private IPs are not dialed.
1413        let base_port = 3300;
1414        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1415        executor.start(|context| async move {
1416            let peer0 = ed25519::PrivateKey::from_seed(0);
1417            let peer1 = ed25519::PrivateKey::from_seed(1);
1418
1419            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1420            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1421
1422            // Register DNS mapping that resolves to localhost (private IP)
1423            context.resolver_register("boot.local".to_string(), Some(vec![socket0.ip()]));
1424
1425            let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1426
1427            // Create peer 0 (bootstrapper) with allow_private_ips=true
1428            let mut config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1429            config0.allow_private_ips = true;
1430            let (mut network0, mut oracle0) =
1431                Network::new(context.child("peer").with_attribute("index", 0), config0);
1432            oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
1433            let (_sender0, mut receiver0) =
1434                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1435            network0.start();
1436
1437            // Create peer 1 with allow_private_ips=false using DNS bootstrapper
1438            let bootstrappers = vec![(
1439                peer0.public_key(),
1440                Ingress::Dns {
1441                    host: hostname!("boot.local"),
1442                    port: base_port,
1443                },
1444            )];
1445            let mut config1 = Config::test(peer1.clone(), socket1, bootstrappers, 1_024 * 1_024);
1446            config1.allow_private_ips = false; // This should prevent dialing the private IP
1447            let (mut network1, mut oracle1) =
1448                Network::new(context.child("peer").with_attribute("index", 1), config1);
1449            oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
1450            let (mut sender1, _receiver1) =
1451                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1452            network1.start();
1453
1454            // Wait for a period during which peer 1 would normally connect
1455            context.sleep(Duration::from_secs(5)).await;
1456
1457            // Try to send from peer 1 - should not reach anyone since private IPs are blocked
1458            let checked = sender1.check(Recipients::All).unwrap();
1459            assert!(
1460                checked.recipients().is_empty(),
1461                "peer 1 should not have connected to peer 0 (private IP)"
1462            );
1463
1464            // Verify peer 0 received nothing from peer 1
1465            select! {
1466                msg = receiver0.recv() => {
1467                    panic!("peer 0 should not have received any message, got: {msg:?}");
1468                },
1469                _ = context.sleep(Duration::from_secs(1)) => {
1470                    // Expected: timeout with no message
1471                },
1472            }
1473        });
1474    }
1475
1476    #[test_traced]
1477    fn test_dns_mixed_ips_connectivity() {
1478        // Test that peers can connect even when DNS resolves to multiple IPs
1479        // where most are unreachable. The dialer randomly picks an IP, so
1480        // eventually it should pick the reachable one.
1481        //
1482        // Run over 25 seeds to ensure we exercise the random IP selection.
1483        for seed in 0..25 {
1484            let base_port = 3400;
1485
1486            let cfg = deterministic::Config::default()
1487                .with_seed(seed)
1488                .with_timeout(Some(Duration::from_secs(120)));
1489            let executor = deterministic::Runner::new(cfg);
1490            executor.start(|context| async move {
1491                let peer0 = ed25519::PrivateKey::from_seed(0);
1492                let peer1 = ed25519::PrivateKey::from_seed(1);
1493
1494                let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1495                let socket0 = SocketAddr::new(good_ip, base_port);
1496                let socket1 = SocketAddr::new(good_ip, base_port + 1);
1497
1498                // Register DNS mappings with 3 bad IPs and 1 good IP for both peers
1499                let mut all_ips0: Vec<IpAddr> = (1..=3)
1500                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1501                    .collect();
1502                all_ips0.push(good_ip);
1503                context.resolver_register("peer-0.local", Some(all_ips0));
1504
1505                let mut all_ips1: Vec<IpAddr> = (1..=3)
1506                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1507                    .collect();
1508                all_ips1.push(good_ip);
1509                context.resolver_register("peer-1.local", Some(all_ips1));
1510
1511                let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1512
1513                // Create peer 0 with peer 1 as DNS bootstrapper
1514                let bootstrappers0 = vec![(
1515                    peer1.public_key(),
1516                    Ingress::Dns {
1517                        host: hostname!("peer-1.local"),
1518                        port: base_port + 1,
1519                    },
1520                )];
1521                let config0 = Config::test(peer0.clone(), socket0, bootstrappers0, 1_024 * 1_024);
1522                let (mut network0, mut oracle0) =
1523                    Network::new(context.child("peer").with_attribute("index", 0), config0);
1524                oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
1525                let (_sender0, mut receiver0) =
1526                    network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1527                network0.start();
1528
1529                // Create peer 1 with peer 0 as DNS bootstrapper
1530                let bootstrappers1 = vec![(
1531                    peer0.public_key(),
1532                    Ingress::Dns {
1533                        host: hostname!("peer-0.local"),
1534                        port: base_port,
1535                    },
1536                )];
1537                let config1 = Config::test(peer1.clone(), socket1, bootstrappers1, 1_024 * 1_024);
1538                let (mut network1, mut oracle1) =
1539                    Network::new(context.child("peer").with_attribute("index", 1), config1);
1540                oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
1541                let (mut sender1, _receiver1) =
1542                    network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1543                network1.start();
1544
1545                // Send until peers connect (may take multiple attempts due to random IP selection).
1546                loop {
1547                    let checked = sender1.check(Recipients::All).unwrap();
1548                    if !checked.recipients().is_empty() {
1549                        checked.send(peer1.public_key().as_ref().to_vec(), true);
1550                    }
1551
1552                    select! {
1553                        result = receiver0.recv() => {
1554                            let (sender, msg) = result.unwrap();
1555                            assert_eq!(sender, peer1.public_key());
1556                            assert_eq!(msg, peer1.public_key().as_ref());
1557                            break;
1558                        },
1559                        _ = context.sleep(Duration::from_millis(100)) => {},
1560                    }
1561                }
1562            });
1563        }
1564    }
1565
1566    #[test_traced]
1567    fn test_many_peer_restart_with_new_address() {
1568        let base_port = 7500;
1569        let n = 5;
1570
1571        let executor = deterministic::Runner::default();
1572        executor.start(|context| async move {
1573            // Create peers
1574            let peers: Vec<_> = (0..n)
1575                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1576                .collect();
1577            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1578
1579            // Track senders/receivers/handles across restarts
1580            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1581            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1582            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1583
1584            // Track port allocations (updated on restart)
1585            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1586
1587            // Create networks for all peers (peer 0 is bootstrapper)
1588            for (i, peer) in peers.iter().enumerate() {
1589                let peer_context = context.child("peer").with_attribute("index", i);
1590
1591                // Non-bootstrapper peers point to peer 0
1592                let mut bootstrappers = Vec::new();
1593                if i > 0 {
1594                    bootstrappers.push((
1595                        addresses[0].clone(),
1596                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1597                    ));
1598                }
1599
1600                let config = Config::test(
1601                    peer.clone(),
1602                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1603                    bootstrappers,
1604                    MAX_MESSAGE_SIZE,
1605                );
1606                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1607
1608                // Register peer set
1609                oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1610
1611                let (sender, receiver) =
1612                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1613                senders[i] = Some(sender);
1614                receivers[i] = Some(receiver);
1615
1616                let handle = network.start();
1617                handles[i] = Some(handle);
1618            }
1619
1620            // Wait for full connectivity (each peer can send to all others)
1621            for (i, sender) in senders.iter_mut().enumerate() {
1622                let sender = sender.as_mut().unwrap();
1623                loop {
1624                    let sent = sender.send(
1625                        Recipients::All,
1626                        peers[i].public_key().as_ref().to_vec(),
1627                        true,
1628                    );
1629                    if sent.len() == n - 1 {
1630                        break;
1631                    }
1632                    context.sleep(Duration::from_millis(100)).await;
1633                }
1634            }
1635
1636            // Verify each peer can receive from all others
1637            for receiver in receivers.iter_mut() {
1638                let receiver = receiver.as_mut().unwrap();
1639                let mut received = HashSet::new();
1640                while received.len() < n - 1 {
1641                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1642                    assert_eq!(message, sender.as_ref());
1643                    received.insert(sender);
1644                }
1645            }
1646
1647            // Restart each non-first peer with a new port, multiple rounds
1648            let mut restart_counter = 0u16;
1649            for round in 0..3 {
1650                for restart_peer_idx in 1..n {
1651                    // Allocate a new unique port
1652                    restart_counter += 1;
1653                    let new_port = base_port + 100 + restart_counter;
1654                    ports[restart_peer_idx] = new_port;
1655
1656                    // Abort the peer's network
1657                    if let Some(handle) = handles[restart_peer_idx].take() {
1658                        handle.abort();
1659                    }
1660                    senders[restart_peer_idx] = None;
1661                    receivers[restart_peer_idx] = None;
1662
1663                    // Restart the peer with new port (uses bootstrapper for discovery)
1664                    let peer_context = context
1665                        .child("peer_round")
1666                        .with_attribute("index", restart_peer_idx)
1667                        .with_attribute("round", round);
1668                    let bootstrappers = vec![(
1669                        addresses[0].clone(),
1670                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1671                    )];
1672                    let config = Config::test(
1673                        peers[restart_peer_idx].clone(),
1674                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1675                        bootstrappers,
1676                        MAX_MESSAGE_SIZE,
1677                    );
1678                    let (mut network, mut oracle) =
1679                        Network::new(peer_context.child("network"), config);
1680
1681                    // Register peer set
1682                    oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1683
1684                    let (sender, receiver) = network.register(
1685                        0,
1686                        Quota::per_second(NZU32!(100)),
1687                        DEFAULT_MESSAGE_BACKLOG,
1688                    );
1689                    senders[restart_peer_idx] = Some(sender);
1690                    receivers[restart_peer_idx] = Some(receiver);
1691
1692                    let handle = network.start();
1693                    handles[restart_peer_idx] = Some(handle);
1694
1695                    // Wait for the restarted peer to reconnect to all others
1696                    let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1697                    loop {
1698                        let sent = restarted_sender.send(
1699                            Recipients::All,
1700                            peers[restart_peer_idx].public_key().as_ref().to_vec(),
1701                            true,
1702                        );
1703                        if sent.len() == n - 1 {
1704                            break;
1705                        }
1706                        context.sleep(Duration::from_millis(100)).await;
1707                    }
1708
1709                    // Verify other peers can send to the restarted peer
1710                    for i in 0..n {
1711                        if i == restart_peer_idx {
1712                            continue;
1713                        }
1714                        let sender = senders[i].as_mut().unwrap();
1715                        loop {
1716                            let sent = sender.send(
1717                                Recipients::One(addresses[restart_peer_idx].clone()),
1718                                peers[i].public_key().as_ref().to_vec(),
1719                                true,
1720                            );
1721                            if sent.len() == 1 {
1722                                break;
1723                            }
1724                            context.sleep(Duration::from_millis(100)).await;
1725                        }
1726                    }
1727
1728                    // Verify the restarted peer receives messages from all others
1729                    let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1730                    let mut received = HashSet::new();
1731                    while received.len() < n - 1 {
1732                        let (sender, message): (ed25519::PublicKey, _) =
1733                            restarted_receiver.recv().await.unwrap();
1734                        assert_eq!(message, sender.as_ref());
1735                        received.insert(sender);
1736                    }
1737                }
1738            }
1739
1740            assert_no_rate_limiting(&context.encode());
1741        });
1742    }
1743
1744    #[test_traced]
1745    fn test_simultaneous_peer_restart() {
1746        let base_port = 7700;
1747        let n = 5;
1748
1749        let executor = deterministic::Runner::default();
1750        executor.start(|context| async move {
1751            // Create peers
1752            let peers: Vec<_> = (0..n)
1753                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1754                .collect();
1755            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1756
1757            // Track port allocations (updated on restart)
1758            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1759
1760            // Track senders/receivers/handles across restarts
1761            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1762            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1763            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1764
1765            // Create networks for all peers (peer 0 is bootstrapper)
1766            for (i, peer) in peers.iter().enumerate() {
1767                let peer_context = context.child("peer").with_attribute("index", i);
1768
1769                // Non-bootstrapper peers point to peer 0
1770                let mut bootstrappers = Vec::new();
1771                if i > 0 {
1772                    bootstrappers.push((
1773                        addresses[0].clone(),
1774                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1775                    ));
1776                }
1777
1778                let config = Config::test(
1779                    peer.clone(),
1780                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1781                    bootstrappers,
1782                    MAX_MESSAGE_SIZE,
1783                );
1784                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1785
1786                // Register peer set
1787                oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1788
1789                let (sender, receiver) =
1790                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1791                senders[i] = Some(sender);
1792                receivers[i] = Some(receiver);
1793
1794                let handle = network.start();
1795                handles[i] = Some(handle);
1796            }
1797
1798            // Wait for full connectivity (each peer can send to all others)
1799            for (i, sender) in senders.iter_mut().enumerate() {
1800                let sender = sender.as_mut().unwrap();
1801                loop {
1802                    let sent = sender.send(
1803                        Recipients::All,
1804                        peers[i].public_key().as_ref().to_vec(),
1805                        true,
1806                    );
1807                    if sent.len() == n - 1 {
1808                        break;
1809                    }
1810                    context.sleep(Duration::from_millis(100)).await;
1811                }
1812            }
1813
1814            // Verify each peer can receive from all others
1815            for receiver in receivers.iter_mut() {
1816                let receiver = receiver.as_mut().unwrap();
1817                let mut received = HashSet::new();
1818                while received.len() < n - 1 {
1819                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1820                    assert_eq!(message, sender.as_ref());
1821                    received.insert(sender);
1822                }
1823            }
1824
1825            // Shutdown all non-bootstrapper peers simultaneously.
1826            //
1827            // We keep the bootstrapper (peer 0) alive to exercise the case
1828            // where multiple peers churn at once.
1829            let restart_peers: Vec<usize> = (1..n).collect();
1830            for &idx in &restart_peers {
1831                if let Some(handle) = handles[idx].take() {
1832                    handle.abort();
1833                }
1834                senders[idx] = None;
1835                receivers[idx] = None;
1836                // Allocate new ports for restarted peers
1837                ports[idx] = base_port + 100 + idx as u16;
1838            }
1839
1840            // Wait for connections to be detected as closed
1841            context.sleep(Duration::from_secs(2)).await;
1842
1843            // Restart all peers with new ports (uses bootstrapper for discovery)
1844            for &idx in &restart_peers {
1845                let peer_context = context.child("peer_restarted").with_attribute("index", idx);
1846                let bootstrappers = vec![(
1847                    addresses[0].clone(),
1848                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1849                )];
1850                let config = Config::test(
1851                    peers[idx].clone(),
1852                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1853                    bootstrappers,
1854                    MAX_MESSAGE_SIZE,
1855                );
1856                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1857
1858                // Register peer set
1859                oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1860
1861                let (sender, receiver) =
1862                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1863                senders[idx] = Some(sender);
1864                receivers[idx] = Some(receiver);
1865
1866                let handle = network.start();
1867                handles[idx] = Some(handle);
1868            }
1869
1870            // Wait for full connectivity after restart
1871            for (i, sender) in senders.iter_mut().enumerate() {
1872                let sender = sender.as_mut().unwrap();
1873                loop {
1874                    let sent = sender.send(
1875                        Recipients::All,
1876                        peers[i].public_key().as_ref().to_vec(),
1877                        true,
1878                    );
1879                    if sent.len() == n - 1 {
1880                        break;
1881                    }
1882                    context.sleep(Duration::from_millis(100)).await;
1883                }
1884            }
1885
1886            // Verify each peer can receive from all others after restart
1887            for receiver in receivers.iter_mut() {
1888                let receiver = receiver.as_mut().unwrap();
1889                let mut received = HashSet::new();
1890                while received.len() < n - 1 {
1891                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1892                    assert_eq!(message, sender.as_ref());
1893                    received.insert(sender);
1894                }
1895            }
1896
1897            assert_no_rate_limiting(&context.encode());
1898        });
1899    }
1900    #[test_traced]
1901    fn test_peer_restart_with_new_address_must_dial() {
1902        let base_port = 3600;
1903        let n: usize = 5;
1904
1905        let executor = deterministic::Runner::default();
1906        executor.start(|context| async move {
1907            // Create peers
1908            let peers: Vec<_> = (0..n)
1909                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1910                .collect();
1911            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1912
1913            // Track all senders/receivers/handles across restarts
1914            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1915            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1916            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1917
1918            // Track port allocations for each peer (updated on restart)
1919            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1920
1921            // Peer 2 will advertise a WRONG dialable address (unreachable IP)
1922            // All other peers advertise correct addresses
1923            // This means when peer 1 restarts, it CANNOT dial peer 2 (wrong address)
1924            // Peer 2 must dial peer 1 to reconnect
1925            let wrong_ip = IpAddr::V4(Ipv4Addr::new(10, 255, 255, 1)); // Unreachable IP
1926            let wrong_address_peer_idx = 2;
1927
1928            for (i, peer) in peers.iter().enumerate() {
1929                let peer_context = context.child("peer").with_attribute("index", i);
1930                let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]);
1931
1932                // Peer 2 advertises wrong IP, others advertise correct addresses
1933                let dialable_addr: Ingress = if i == wrong_address_peer_idx {
1934                    SocketAddr::new(wrong_ip, ports[i]).into()
1935                } else {
1936                    listen_addr.into()
1937                };
1938
1939                // Create bootstrappers (everyone connects to peer 0)
1940                let mut bootstrappers = Vec::new();
1941                if i > 0 {
1942                    bootstrappers.push((
1943                        addresses[0].clone(),
1944                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1945                    ));
1946                }
1947
1948                let mut config =
1949                    Config::test(peer.clone(), listen_addr, bootstrappers, 1_024 * 1_024);
1950                config.dialable = dialable_addr;
1951
1952                let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1953
1954                oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1955
1956                let (sender, receiver) =
1957                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1958                senders[i] = Some(sender);
1959                receivers[i] = Some(receiver);
1960
1961                let handle = network.start();
1962                handles[i] = Some(handle);
1963            }
1964
1965            // Wait for full connectivity
1966            for (i, sender) in senders.iter_mut().enumerate() {
1967                let sender = sender.as_mut().unwrap();
1968                loop {
1969                    let sent = sender.send(
1970                        Recipients::All,
1971                        peers[i].public_key().as_ref().to_vec(),
1972                        true,
1973                    );
1974                    if sent.len() == n - 1 {
1975                        break;
1976                    }
1977                    context.sleep(Duration::from_millis(100)).await;
1978                }
1979            }
1980
1981            // Verify each peer can receive from all others
1982            for receiver in receivers.iter_mut() {
1983                let receiver = receiver.as_mut().unwrap();
1984                let mut received = HashSet::new();
1985                while received.len() < n - 1 {
1986                    let (sender, message) = receiver.recv().await.unwrap();
1987                    assert_eq!(message, sender.as_ref());
1988                    received.insert(sender);
1989                }
1990            }
1991
1992            // Restart peer 1 with a new port
1993            // After restart, peer 1 can dial peers 0, 3, 4 (correct addresses)
1994            // but CANNOT dial peer 2 (wrong address).
1995            // Peer 2 must dial peer 1 to reconnect (after learning peer 1's new address
1996            // through gossip from the bootstrapper).
1997            let restart_peer_idx = 1;
1998            let new_port = base_port + 100;
1999            ports[restart_peer_idx] = new_port;
2000
2001            // Abort the peer's network
2002            if let Some(handle) = handles[restart_peer_idx].take() {
2003                handle.abort();
2004            }
2005            senders[restart_peer_idx] = None;
2006            receivers[restart_peer_idx] = None;
2007
2008            // Restart the peer with a NEW port and CORRECT dialable address
2009            let peer_context = context
2010                .child("peer_restarted")
2011                .with_attribute("index", restart_peer_idx);
2012            let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port);
2013            let bootstrappers = vec![(
2014                addresses[0].clone(),
2015                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2016            )];
2017
2018            let config = Config::test(
2019                peers[restart_peer_idx].clone(),
2020                listen_addr,
2021                bootstrappers,
2022                1_024 * 1_024,
2023            );
2024
2025            let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
2026
2027            oracle.track(0, Set::try_from(addresses.clone()).unwrap());
2028
2029            let (sender, receiver) =
2030                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2031            senders[restart_peer_idx] = Some(sender);
2032            receivers[restart_peer_idx] = Some(receiver);
2033
2034            let handle = network.start();
2035            handles[restart_peer_idx] = Some(handle);
2036
2037            // Wait for the restarted peer to reconnect to all other peers
2038            // For peer 2 (wrong address), this MUST happen via:
2039            // 1. Restarted peer dials peer 0 (bootstrapper)
2040            // 2. Peer 0 gossips restarted peer's new address to peer 2
2041            // 3. Peer 2 dials the restarted peer
2042            let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
2043            loop {
2044                let sent = restarted_sender.send(
2045                    Recipients::All,
2046                    peers[restart_peer_idx].public_key().as_ref().to_vec(),
2047                    true,
2048                );
2049                if sent.len() == n - 1 {
2050                    break;
2051                }
2052                context.sleep(Duration::from_millis(100)).await;
2053            }
2054
2055            // Verify all other peers can send to the restarted peer
2056            for i in 0..n {
2057                if i == restart_peer_idx {
2058                    continue;
2059                }
2060                let sender = senders[i].as_mut().unwrap();
2061                loop {
2062                    let sent = sender.send(
2063                        Recipients::One(addresses[restart_peer_idx].clone()),
2064                        peers[i].public_key().as_ref().to_vec(),
2065                        true,
2066                    );
2067                    if sent.len() == 1 {
2068                        break;
2069                    }
2070                    context.sleep(Duration::from_millis(100)).await;
2071                }
2072            }
2073
2074            // Verify the restarted peer receives from all others
2075            let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
2076            let mut received = HashSet::new();
2077            while received.len() < n - 1 {
2078                let (sender, message) = restarted_receiver.recv().await.unwrap();
2079                assert_eq!(message, sender.as_ref());
2080                received.insert(sender);
2081            }
2082
2083            assert_no_rate_limiting(&context.encode());
2084        });
2085    }
2086
2087    fn duplicate_addresses_disconnected(seed: u64) {
2088        let base_port = 6000;
2089        let executor = deterministic::Runner::seeded(seed);
2090        executor.start(|context| async move {
2091            let peer0 = ed25519::PrivateKey::from_seed(0);
2092            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2093            let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2094            let peer1 = ed25519::PrivateKey::from_seed(1);
2095            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2096            let peer2 = ed25519::PrivateKey::from_seed(2);
2097            let addresses: Vec<_> =
2098                vec![peer0.public_key(), peer1.public_key(), peer2.public_key()];
2099
2100            // Start peer 0 with duplicate bootstrapper addresses.
2101            // Both peer 1 (honest) and peer 2 (spoofed) advertise socket1.
2102            let config0 = Config::test(
2103                peer0.clone(),
2104                socket0,
2105                vec![
2106                    (peer1.public_key(), socket1.into()),
2107                    (peer2.public_key(), socket1.into()),
2108                ],
2109                MAX_MESSAGE_SIZE,
2110            );
2111            let (mut network0, mut oracle0) =
2112                Network::new(context.child("peer").with_attribute("index", 0), config0);
2113            let (mut sender0, _receiver0) =
2114                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2115            network0.start();
2116
2117            oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
2118
2119            // Wait for connection attempts.
2120            context.sleep(Duration::from_secs(30)).await;
2121
2122            // Peer 0 can't send to anyone yet.
2123            let checked = sender0.check(Recipients::All).unwrap();
2124            assert!(checked.recipients().is_empty());
2125
2126            // Start peer 1 with a wrong bootstrapper address for peer 0 so peer 0 must dial peer 1.
2127            let config1 = Config::test(
2128                peer1.clone(),
2129                socket1,
2130                vec![(peer0.public_key(), wrong_socket0.into())],
2131                MAX_MESSAGE_SIZE,
2132            );
2133            let (mut network1, mut oracle1) =
2134                Network::new(context.child("peer").with_attribute("index", 1), config1);
2135            let (_sender1, mut receiver1) =
2136                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2137            network1.start();
2138
2139            oracle1.track(0, Set::try_from(addresses).unwrap());
2140
2141            // Wait for connections to be made.
2142            context.sleep(Duration::from_secs(30)).await;
2143
2144            // Peer 0 should eventually connect to the honest peer 1 at socket1.
2145            loop {
2146                let sent =
2147                    sender0.send(Recipients::All, peer0.public_key().as_ref().to_vec(), true);
2148                if sent.len() == 1 {
2149                    assert_eq!(sent[0], peer1.public_key());
2150                    break;
2151                }
2152                context.sleep(Duration::from_millis(100)).await;
2153            }
2154            let (sender, _) = receiver1.recv().await.unwrap();
2155            assert_eq!(sender, peer0.public_key());
2156        });
2157    }
2158
2159    #[test_traced]
2160    fn test_duplicate_addresses_disconnected() {
2161        // Ensure different dial orders are explored by running with different seeds.
2162        for seed in 0..25 {
2163            duplicate_addresses_disconnected(seed);
2164        }
2165    }
2166
2167    #[test_traced]
2168    fn test_duplicate_addresses_connected() {
2169        let base_port = 6000;
2170        let executor = deterministic::Runner::default();
2171        executor.start(|context| async move {
2172            let peer0 = ed25519::PrivateKey::from_seed(0);
2173            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2174            let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2175            let peer1 = ed25519::PrivateKey::from_seed(1);
2176            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2177            let peer2 = ed25519::PrivateKey::from_seed(2);
2178            let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
2179            let addresses: Vec<_> =
2180                vec![peer0.public_key(), peer1.public_key(), peer2.public_key()];
2181
2182            // Start peer 0 with duplicate bootstrapper addresses.
2183            // Both peer 1 (honest) and peer 2 (spoofed) advertise socket1.
2184            let config0 = Config::test(
2185                peer0.clone(),
2186                socket0,
2187                vec![
2188                    (peer1.public_key(), socket1.into()),
2189                    (peer2.public_key(), socket1.into()),
2190                ],
2191                MAX_MESSAGE_SIZE,
2192            );
2193            let (mut network0, mut oracle0) =
2194                Network::new(context.child("peer").with_attribute("index", 0), config0);
2195            let (mut sender0, mut receiver0) =
2196                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2197            network0.start();
2198
2199            // Start peer 2 with a spoofed advertised address (socket1) but listening on socket2.
2200            // This keeps peer 0 and peer 2 connected while preserving the duplicate-address condition.
2201            let mut config2 = Config::test(
2202                peer2.clone(),
2203                socket2,
2204                vec![(peer0.public_key(), socket0.into())],
2205                MAX_MESSAGE_SIZE,
2206            );
2207            config2.dialable = socket1.into();
2208            let (mut network2, mut oracle2) =
2209                Network::new(context.child("peer").with_attribute("index", 2), config2);
2210            let (_sender2, mut receiver2) =
2211                network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2212            network2.start();
2213
2214            oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
2215            oracle2.track(0, Set::try_from(addresses.clone()).unwrap());
2216
2217            // Wait for initial connections.
2218            context.sleep(Duration::from_secs(30)).await;
2219
2220            // Peer 0 can send to peer 2.
2221            loop {
2222                let sent =
2223                    sender0.send(Recipients::All, peer2.public_key().as_ref().to_vec(), true);
2224                if sent.len() == 1 {
2225                    assert_eq!(sent[0], peer2.public_key());
2226                    break;
2227                }
2228                context.sleep(Duration::from_millis(100)).await;
2229            }
2230            let (sender, _) = receiver2.recv().await.unwrap();
2231            assert_eq!(sender, peer0.public_key());
2232
2233            // Start peer 1 (honest peer on the duplicated address).
2234            // Use a wrong bootstrapper for peer 0 so peer 1 doesn't connect first.
2235            let config1 = Config::test(
2236                peer1.clone(),
2237                socket1,
2238                vec![(peer0.public_key(), wrong_socket0.into())],
2239                MAX_MESSAGE_SIZE,
2240            );
2241            let (mut network1, mut oracle1) =
2242                Network::new(context.child("peer").with_attribute("index", 1), config1);
2243            let (mut sender1, _receiver1) =
2244                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2245            network1.start();
2246
2247            oracle1.track(0, Set::try_from(addresses).unwrap());
2248
2249            // Wait for full connectivity to peer 1.
2250            context.sleep(Duration::from_secs(30)).await;
2251
2252            // Peer 1 should eventually connect to both peer 0 and peer 2.
2253            loop {
2254                let sent =
2255                    sender1.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
2256                if sent.len() == 2 {
2257                    assert!(sent.contains(&peer0.public_key()));
2258                    assert!(sent.contains(&peer2.public_key()));
2259                    break;
2260                }
2261                context.sleep(Duration::from_millis(100)).await;
2262            }
2263
2264            let mut received0 = false;
2265            while let Ok((sender, _)) = receiver0.recv().await {
2266                // May have buffered messages from the initial connectivity check.
2267                if sender == peer1.public_key() {
2268                    received0 = true;
2269                    break;
2270                }
2271            }
2272            assert!(received0);
2273            let (sender, _) = receiver2.recv().await.unwrap();
2274            assert_eq!(sender, peer1.public_key());
2275        });
2276    }
2277
2278    #[test_traced]
2279    fn test_operations_after_shutdown_do_not_panic() {
2280        let executor = deterministic::Runner::default();
2281        executor.start(|context| async move {
2282            let peer = ed25519::PrivateKey::from_seed(0);
2283            let address = peer.public_key();
2284
2285            let peer_context = context.child("peer");
2286            let config = Config::test(
2287                peer.clone(),
2288                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2289                vec![],
2290                MAX_MESSAGE_SIZE,
2291            );
2292            let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
2293
2294            // Register channel and peer set
2295            let (mut sender, _receiver) =
2296                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2297            let peers: Set<ed25519::PublicKey> = vec![address.clone()].try_into().unwrap();
2298            oracle.track(0, peers.clone());
2299
2300            // Start and immediately abort the network
2301            let handle = network.start();
2302            handle.abort();
2303
2304            // Wait for shutdown to propagate
2305            context.sleep(Duration::from_millis(100)).await;
2306
2307            // Oracle operations should not panic even after shutdown
2308            oracle.track(1, peers.clone());
2309            let _ = oracle.peer_set(0).await;
2310            let _ = oracle.subscribe().await;
2311            crate::block_peer(&mut oracle, address.clone());
2312
2313            // Sender operations should not panic even after shutdown
2314            let _ = sender.send(Recipients::All, address.as_ref().to_vec(), true);
2315        });
2316    }
2317
2318    fn clean_shutdown(seed: u64) {
2319        let cfg = deterministic::Config::default()
2320            .with_seed(seed)
2321            .with_timeout(Some(Duration::from_secs(30)));
2322        let executor = deterministic::Runner::new(cfg);
2323        executor.start(|context| async move {
2324            let peer = ed25519::PrivateKey::from_seed(0);
2325
2326            let peer_context = context.child("peer");
2327            let config = Config::test(
2328                peer.clone(),
2329                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2330                vec![],
2331                MAX_MESSAGE_SIZE,
2332            );
2333            let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
2334
2335            // Register channel and peer set
2336            let (_, _) =
2337                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2338            let peers: Set<ed25519::PublicKey> = vec![peer.public_key()].try_into().unwrap();
2339            oracle.track(0, peers);
2340
2341            // Start the network
2342            let handle = network.start();
2343
2344            // Allow tasks to start
2345            context.sleep(Duration::from_millis(100)).await;
2346
2347            // Count running tasks under the network prefix
2348            let running_before = count_running_tasks(&context, "peer_network");
2349            assert!(
2350                running_before > 0,
2351                "at least one network task should be running"
2352            );
2353
2354            // Abort the network
2355            handle.abort();
2356            let _ = handle.await;
2357
2358            // Give the runtime a tick to process aborts
2359            context.sleep(Duration::from_millis(100)).await;
2360
2361            // Verify all network tasks are stopped
2362            let running_after = count_running_tasks(&context, "peer_network");
2363            assert_eq!(
2364                running_after, 0,
2365                "all network tasks should be stopped, but {running_after} still running"
2366            );
2367        });
2368    }
2369
2370    #[test_traced]
2371    fn test_clean_shutdown() {
2372        for seed in 0..25 {
2373            clean_shutdown(seed);
2374        }
2375    }
2376
2377    #[test]
2378    fn test_broadcast_slow_peer_no_blocking() {
2379        let executor = deterministic::Runner::default();
2380        executor.start(|context| async move {
2381            // Create router
2382            let cfg = RouterConfig {
2383                mailbox_size: NZUsize!(1),
2384            };
2385            let (router, mailbox, messenger) =
2386                RouterActor::<_, ed25519::PublicKey>::new(context.child("router"), cfg);
2387
2388            // Create channels for the router
2389            let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2390            let _handle = router.start(channels);
2391
2392            // Register peer 1 with a small relay buffer and keep the receivers
2393            // alive without draining them.
2394            let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2395            let (slow_relay, _slow_receivers) =
2396                Relay::new(context.child("slow_relay"), NZUsize!(10));
2397            assert!(
2398                mailbox.ready(slow_peer.clone(), slow_relay).await.is_some(),
2399                "Failed to register slow peer"
2400            );
2401
2402            // Register peer 2 with large buffer
2403            let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2404            let (fast_relay, mut fast_receivers) =
2405                Relay::new(context.child("fast_relay"), NZUsize!(100));
2406            assert!(
2407                mailbox.ready(fast_peer.clone(), fast_relay).await.is_some(),
2408                "Failed to register fast peer"
2409            );
2410
2411            let message = IoBuf::from(vec![0u8; 100]);
2412
2413            // Send enough messages to fill slow_peer's buffer and then force
2414            // one slow-peer drop. The fast peer should still receive every
2415            // broadcast.
2416            for i in 0..11 {
2417                let sent = messenger.content(Recipients::All, 0, message.clone().into(), false);
2418                assert_ne!(
2419                    sent,
2420                    Unreliable::new(Feedback::Closed),
2421                    "Broadcast {i} should be accepted"
2422                );
2423
2424                assert!(fast_receivers.low.recv().await.is_some());
2425            }
2426        });
2427    }
2428}