commonware_p2p/authenticated/discovery/
mod.rs

1//! Communicate with a fixed set of authenticated peers without known addresses over encrypted connections.
2//!
3//! `discovery` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//!
14//! # Design
15//!
16//! ## Discovery
17//!
18//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
19//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
20//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
21//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
22//! dialable addresses [std::net::SocketAddr] for the peers in that set.
23//! The `BitVec` message contains:
24//! - `index`: The `u64` index the bit vector applies to.
25//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
26//!   peer's address in the sorted list for that index.
27//!
28//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
29//! peer a (configurable) number of times, they will assume that information is stale and mark it as
30//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
31//! information about other peers.
32//!
33//! _Warning: If peers are not synchronized on the peer set composition at a given index,
34//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
35//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
36//! application layer is responsible for ensuring peer set synchronization._
37//!
38//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
39//! `gossip_bit_vec_frequency` in the [Config]) between connected peers. This serves as both a
40//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
41//! connection, especially during periods of low application-level traffic. The protocol supports
42//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
43//! `index`. This is useful, for instance, during transitions like distributed key generation
44//! (DKG) where connections to both old and new peer sets are needed simultaneously.
45//!
46//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
47//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
48//! selects a random subset of these known `Info` structures (up to `peer_gossip_max_count`)
49//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
50//! `Info` for peers that they currently have a connection with. This prevents them from
51//! repeatedly sending `Info` that they cannot verify is still valid. Each `Info` contains:
52//! - `socket`: The [std::net::SocketAddr] of the peer.
53//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
54//! - `public_key`: The peer's public key.
55//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
56//!
57//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
58//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
59//!
60//! If a peer receives a `Info` message (either directly or through gossip) containing a more
61//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
62//! `Info` is also used in future gossip messages. Each peer generates its own signed
63//! `Info` upon startup and sends it immediately after establishing a connection (following
64//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
65//! record, it will be corrected promptly by the peer being dialed.
66//!
67//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
68//! [Config]) - known peer public keys and their corresponding socket addresses. The peer
69//! attempts to dial these bootstrappers, performs the handshake, sends its own `Info`, and
70//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
71//! marked as '1'). It then waits for responses, learning about other peers through the
72//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
73//! them are maintained even if they aren't part of any currently tracked peer sets. Different
74//! peers can have different bootstrapper lists.
75//!
76//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
77//! belongs to a registered peer set, the listener will accept the connection, even if the
78//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
79//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
80//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
81//! with more up-to-date peer set information to connect and propagate that information, enabling
82//! the listener to potentially learn about newer sets it is part of._
83//!
84//! ## Messages
85//!
86//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
87//! internal `Data` structure. This structure contains:
88//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
89//! - `message`: The arbitrary application payload as `Bytes`.
90//!
91//! The size of the `message` bytes must not exceed the configured
92//! `max_message_size`. If it does, the sending operation will fail with
93//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
94//! communications to potentially bypass lower-priority messages waiting in send queues across all
95//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
96//! and rate limiting.
97//!
98//! ## Compression
99//!
100//! Stream compression is not provided at the transport layer to avoid inadvertently
101//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
102//! between compression and encryption by analyzing patterns in the resulting data.
103//! By compressing secrets alongside attacker-controlled content, these attacks can infer
104//! sensitive information through compression ratio analysis. Applications that choose
105//! to compress data should do so with full awareness of these risks and implement
106//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
107//! alongside sensitive information).
108//!
109//! ## Rate Limiting
110//!
111//! There are five primary rate limits:
112//!
113//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
114//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
115//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
116//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
117//! - `rate` (per channel): The rate limit for messages sent on a single channel.
118//!
119//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
120//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
121//!
122//! ## Message Delivery
123//!
124//! Outgoing messages are dropped when a peer's send buffer is full, preventing slow peers
125//! from blocking sends to other peers. Incoming messages are dropped when the application's
126//! receive buffer is full, ensuring protocol messages (BitVec, Peers) continue to flow and
127//! connections remain healthy.
128//!
129//! # Example
130//!
131//! ```rust
132//! use commonware_p2p::{authenticated::discovery::{self, Network}, Ingress, Manager, Sender, Recipients};
133//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, };
134//! use commonware_runtime::{deterministic, Metrics, Quota, Runner, Spawner};
135//! use commonware_utils::NZU32;
136//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
137//!
138//! // Configure context
139//! let runtime_cfg = deterministic::Config::default();
140//! let runner = deterministic::Runner::new(runtime_cfg);
141//!
142//! // Generate identity
143//! //
144//! // In production, the signer should be generated from a secure source of entropy.
145//! let signer = ed25519::PrivateKey::from_seed(0);
146//!
147//! // Generate peers
148//! //
149//! // In production, peer identities will be provided by some external source of truth
150//! // (like the staking set of a blockchain).
151//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
152//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
153//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
154//!
155//! // Configure bootstrappers
156//! //
157//! // In production, it is likely that the address of bootstrappers will be some public address.
158//! let bootstrapper_addr: Ingress = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001).into();
159//! let bootstrappers = vec![(peer1.clone(), bootstrapper_addr)];
160//!
161//! // Configure namespace
162//! //
163//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
164//! let application_namespace = b"my-app-namespace";
165//!
166//! // Configure network
167//! //
168//! // In production, use a more conservative configuration like `Config::recommended`.
169//! const MAX_MESSAGE_SIZE: u32 = 1_024; // 1KB
170//! let p2p_cfg = discovery::Config::local(
171//!     signer.clone(),
172//!     application_namespace,
173//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
174//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
175//!     bootstrappers,
176//!     MAX_MESSAGE_SIZE,
177//! );
178//!
179//! // Start context
180//! runner.start(|context| async move {
181//!     // Initialize network
182//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
183//!
184//!     // Register authorized peers
185//!     //
186//!     // In production, this would be updated as new peer sets are created (like when
187//!     // the composition of a validator set changes).
188//!     oracle.update(0, [signer.public_key(), peer1, peer2, peer3].try_into().unwrap()).await;
189//!
190//!     // Register some channel
191//!     const MAX_MESSAGE_BACKLOG: usize = 128;
192//!     let (mut sender, receiver) = network.register(
193//!         0,
194//!         Quota::per_second(NZU32!(1)),
195//!         MAX_MESSAGE_BACKLOG,
196//!     );
197//!
198//!     // Run network
199//!     network.start();
200//!
201//!     // Example: Use sender
202//!     let _ = sender.send(Recipients::All, bytes::Bytes::from_static(b"hello"), false).await;
203//!
204//!     // Graceful shutdown (stops all spawned tasks)
205//!     context.stop(0, None).await.unwrap();
206//! });
207//! ```
208
209mod actors;
210mod channels;
211mod config;
212mod metrics;
213mod network;
214mod types;
215
216use thiserror::Error;
217
218/// Errors that can occur when interacting with the network.
219#[derive(Error, Debug)]
220pub enum Error {
221    #[error("message too large: {0}")]
222    MessageTooLarge(usize),
223    #[error("network closed")]
224    NetworkClosed,
225}
226
227pub use actors::tracker::Oracle;
228pub use channels::{Receiver, Sender};
229pub use config::{Bootstrapper, Config};
230pub use network::Network;
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::{
236        authenticated::{
237            discovery::actors::router::{Actor as RouterActor, Config as RouterConfig},
238            relay::Relay,
239        },
240        Blocker, Ingress, Manager, Receiver, Recipients, Sender,
241    };
242    use bytes::Bytes;
243    use commonware_cryptography::{ed25519, Signer as _};
244    use commonware_macros::{select, select_loop, test_group, test_traced};
245    use commonware_runtime::{
246        count_running_tasks, deterministic, tokio, Clock, Handle, Metrics, Network as RNetwork,
247        Quota, Resolver, Runner, Spawner,
248    };
249    use commonware_utils::{hostname, ordered::Set, TryCollect, NZU32};
250    use futures::{channel::mpsc, SinkExt, StreamExt};
251    use rand_core::{CryptoRngCore, RngCore};
252    use std::{
253        collections::HashSet,
254        net::{IpAddr, Ipv4Addr, SocketAddr},
255        time::Duration,
256    };
257
258    #[derive(Copy, Clone)]
259    enum Mode {
260        All,
261        Some,
262        One,
263    }
264
265    const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; // 1MB
266    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
267
268    /// Ensure no message rate limiting occurred.
269    ///
270    /// If a message is rate limited, it would be formatted as:
271    ///
272    /// ```text
273    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
274    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
275    /// ```
276    fn assert_no_rate_limiting(context: &impl Metrics) {
277        let metrics = context.encode();
278        assert!(
279            !metrics.contains("messages_rate_limited_total{"),
280            "no messages should be rate limited: {metrics}"
281        );
282    }
283
284    /// Test connectivity between `n` peers.
285    ///
286    /// We set a unique `base_port` for each test to avoid "address already in use"
287    /// errors when tests are run immediately after each other.
288    async fn run_network(
289        context: impl Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
290        max_message_size: u32,
291        base_port: u16,
292        n: usize,
293        mode: Mode,
294    ) {
295        // Create peers
296        let mut peers = Vec::new();
297        for i in 0..n {
298            peers.push(ed25519::PrivateKey::from_seed(i as u64));
299        }
300        let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
301
302        // Create networks
303        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
304        for (i, peer) in peers.iter().enumerate() {
305            // Create peer context
306            let context = context.with_label(&format!("peer_{i}"));
307
308            // Derive port
309            let port = base_port + i as u16;
310
311            // Create bootstrappers
312            let mut bootstrappers = Vec::new();
313            if i > 0 {
314                bootstrappers.push((
315                    addresses[0].clone(),
316                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
317                ));
318            }
319
320            // Create network
321            let signer = peer.clone();
322            let config = Config::test(
323                signer.clone(),
324                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
325                bootstrappers,
326                max_message_size,
327            );
328            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
329
330            // Register peers
331            oracle
332                .update(0, addresses.clone().try_into().unwrap())
333                .await;
334
335            // Register basic application
336            let (mut sender, mut receiver) =
337                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
338
339            // Wait to connect to all peers, and then send messages to everyone
340            network.start();
341
342            // Send/Receive messages
343            context.with_label("agent").spawn({
344                let mut complete_sender = complete_sender.clone();
345                let addresses = addresses.clone();
346                move |context| async move {
347                    // Wait for all peers to send their identity
348                    let receiver = context.with_label("receiver").spawn(move |_| async move {
349                        // Wait for all peers to send their identity
350                        let mut received = HashSet::new();
351                        while received.len() < n - 1 {
352                            // Ensure message equals sender identity
353                            let (sender, message) = receiver.recv().await.unwrap();
354                            assert_eq!(sender.as_ref(), message.as_ref());
355
356                            // Add to received set
357                            received.insert(sender);
358                        }
359                        complete_sender.send(()).await.unwrap();
360
361                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
362                        loop {
363                            receiver.recv().await.unwrap();
364                        }
365                    });
366
367                    // Send identity to all peers
368                    let msg = signer.public_key();
369                    let sender = context
370                        .with_label("sender")
371                        .spawn(move |context| async move {
372                            // Loop forever to account for unexpected message drops
373                            loop {
374                                match mode {
375                                    Mode::One => {
376                                        for (j, recipient) in addresses.iter().enumerate() {
377                                            // Don't send message to self
378                                            if i == j {
379                                                continue;
380                                            }
381
382                                            // Loop until success
383                                            loop {
384                                                let sent = sender
385                                                    .send(
386                                                        Recipients::One(recipient.clone()),
387                                                        msg.as_ref(),
388                                                        true,
389                                                    )
390                                                    .await
391                                                    .unwrap();
392                                                if sent.len() != 1 {
393                                                    context.sleep(Duration::from_millis(100)).await;
394                                                    continue;
395                                                }
396                                                assert_eq!(&sent[0], recipient);
397                                                break;
398                                            }
399                                        }
400                                    }
401                                    Mode::Some => {
402                                        // Get all peers not including self
403                                        let mut recipients = addresses.clone();
404                                        recipients.remove(i);
405                                        recipients.sort();
406
407                                        // Loop until all peer sends successful
408                                        loop {
409                                            let mut sent = sender
410                                                .send(
411                                                    Recipients::Some(recipients.clone()),
412                                                    msg.as_ref(),
413                                                    true,
414                                                )
415                                                .await
416                                                .unwrap();
417                                            if sent.len() != n - 1 {
418                                                context.sleep(Duration::from_millis(100)).await;
419                                                continue;
420                                            }
421
422                                            // Compare to expected
423                                            sent.sort();
424                                            assert_eq!(sent, recipients);
425                                            break;
426                                        }
427                                    }
428                                    Mode::All => {
429                                        // Get all peers not including self
430                                        let mut recipients = addresses.clone();
431                                        recipients.remove(i);
432                                        recipients.sort();
433
434                                        // Loop until all peer sends successful
435                                        loop {
436                                            let mut sent = sender
437                                                .send(Recipients::All, msg.as_ref(), true)
438                                                .await
439                                                .unwrap();
440                                            if sent.len() != n - 1 {
441                                                context.sleep(Duration::from_millis(100)).await;
442                                                continue;
443                                            }
444
445                                            // Compare to expected
446                                            sent.sort();
447                                            assert_eq!(sent, recipients);
448                                            break;
449                                        }
450                                    }
451                                };
452
453                                // Sleep to avoid busy loop
454                                context.sleep(Duration::from_secs(10)).await;
455                            }
456                        });
457
458                    // Neither task should exit
459                    select! {
460                        receiver = receiver => {
461                            panic!("receiver exited: {receiver:?}");
462                        },
463                        sender = sender => {
464                            panic!("sender exited: {sender:?}");
465                        },
466                    }
467                }
468            });
469        }
470
471        // Wait for all peers to finish
472        for _ in 0..n {
473            complete_receiver.next().await.unwrap();
474        }
475
476        // Ensure no message rate limiting occurred
477        assert_no_rate_limiting(&context);
478    }
479
480    fn run_deterministic_test(seed: u64, mode: Mode) {
481        // Configure test
482        const NUM_PEERS: usize = 25;
483        const BASE_PORT: u16 = 3000;
484
485        // Run first instance
486        let executor = deterministic::Runner::seeded(seed);
487        let state = executor.start(|context| async move {
488            run_network(
489                context.clone(),
490                MAX_MESSAGE_SIZE,
491                BASE_PORT,
492                NUM_PEERS,
493                mode,
494            )
495            .await;
496            context.auditor().state()
497        });
498
499        // Compare result to second instance
500        let executor = deterministic::Runner::seeded(seed);
501        let state2 = executor.start(|context| async move {
502            run_network(
503                context.clone(),
504                MAX_MESSAGE_SIZE,
505                BASE_PORT,
506                NUM_PEERS,
507                mode,
508            )
509            .await;
510            context.auditor().state()
511        });
512        assert_eq!(state, state2);
513    }
514
515    #[test_group("slow")]
516    #[test_traced]
517    fn test_determinism_one() {
518        for i in 0..10 {
519            run_deterministic_test(i, Mode::One);
520        }
521    }
522
523    #[test_group("slow")]
524    #[test_traced]
525    fn test_determinism_some() {
526        for i in 0..10 {
527            run_deterministic_test(i, Mode::Some);
528        }
529    }
530
531    #[test_group("slow")]
532    #[test_traced]
533    fn test_determinism_all() {
534        for i in 0..10 {
535            run_deterministic_test(i, Mode::All);
536        }
537    }
538
539    #[test_traced]
540    fn test_tokio_connectivity() {
541        let executor = tokio::Runner::default();
542        executor.start(|context| async move {
543            let base_port = 3000;
544            let n = 10;
545            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
546        });
547    }
548
549    #[test_traced]
550    fn test_multi_index_oracle() {
551        // Configure test
552        let base_port = 3000;
553        let n: usize = 100;
554
555        // Initialize context
556        let executor = deterministic::Runner::default();
557        executor.start(|context| async move {
558            // Create peers
559            let mut peers = Vec::new();
560            for i in 0..n {
561                peers.push(ed25519::PrivateKey::from_seed(i as u64));
562            }
563            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
564
565            // Create networks
566            let mut waiters = Vec::new();
567            for (i, peer) in peers.iter().enumerate() {
568                // Create peer context
569                let context = context.with_label(&format!("peer_{i}"));
570
571                // Derive port
572                let port = base_port + i as u16;
573
574                // Create bootstrappers
575                let mut bootstrappers = Vec::new();
576                if i > 0 {
577                    bootstrappers.push((
578                        addresses[0].clone(),
579                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
580                    ));
581                }
582
583                // Create network
584                let signer = peer.clone();
585                let config = Config::test(
586                    signer.clone(),
587                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
588                    bootstrappers,
589                    1_024 * 1_024, // 1MB
590                );
591                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
592
593                // Register peers at separate indices
594                oracle
595                    .update(0, [addresses[0].clone()].try_into().unwrap())
596                    .await;
597                oracle
598                    .update(
599                        1,
600                        [addresses[1].clone(), addresses[2].clone()]
601                            .try_into()
602                            .unwrap(),
603                    )
604                    .await;
605                oracle
606                    .update(2, addresses.iter().skip(2).cloned().try_collect().unwrap())
607                    .await;
608
609                // Register basic application
610                let (mut sender, mut receiver) =
611                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
612
613                // Wait to connect to all peers, and then send messages to everyone
614                network.start();
615
616                // Send/Receive messages
617                let handler = context
618                    .with_label("agent")
619                    .spawn(move |context| async move {
620                        if i == 0 {
621                            // Loop until success
622                            let msg = signer.public_key();
623                            loop {
624                                if sender
625                                    .send(Recipients::All, msg.as_ref(), true)
626                                    .await
627                                    .unwrap()
628                                    .len()
629                                    == n - 1
630                                {
631                                    break;
632                                }
633
634                                // Sleep and try again (avoid busy loop)
635                                context.sleep(Duration::from_millis(100)).await;
636                            }
637                        } else {
638                            // Ensure message equals sender identity
639                            let (sender, message) = receiver.recv().await.unwrap();
640                            assert_eq!(sender.as_ref(), message.as_ref());
641                        }
642                    });
643
644                // Add to waiters
645                waiters.push(handler);
646            }
647
648            // Wait for waiters to finish (receiver before sender)
649            for waiter in waiters.into_iter().rev() {
650                waiter.await.unwrap();
651            }
652
653            // Ensure no message rate limiting occurred
654            assert_no_rate_limiting(&context);
655        });
656    }
657
658    #[test_traced]
659    fn test_message_too_large() {
660        // Configure test
661        let base_port = 3000;
662        let n: usize = 2;
663
664        // Initialize context
665        let executor = deterministic::Runner::seeded(0);
666        executor.start(|mut context| async move {
667            // Create peers
668            let mut peers = Vec::new();
669            for i in 0..n {
670                peers.push(ed25519::PrivateKey::from_seed(i as u64));
671            }
672            let addresses: Set<_> = peers.iter().map(|p| p.public_key()).try_collect().unwrap();
673
674            // Create network
675            let signer = peers[0].clone();
676            let config = Config::test(
677                signer.clone(),
678                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
679                Vec::new(),
680                1_024 * 1_024, // 1MB
681            );
682            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
683
684            // Register peers
685            oracle.update(0, addresses.clone()).await;
686
687            // Register basic application
688            let (mut sender, _) =
689                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
690
691            // Wait to connect to all peers, and then send messages to everyone
692            network.start();
693
694            // Crate random message
695            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
696            context.fill_bytes(&mut msg[..]);
697
698            // Send message
699            let recipient = Recipients::One(addresses[1].clone());
700            let result = sender.send(recipient, &msg[..], true).await;
701            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
702        });
703    }
704
705    #[test_traced]
706    fn test_rate_limiting() {
707        // Configure test
708        let base_port = 3000;
709        let n: usize = 2;
710
711        // Initialize context
712        let executor = deterministic::Runner::seeded(0);
713        executor.start(|context| async move {
714            // Create peers
715            let mut peers = Vec::new();
716            for i in 0..n {
717                peers.push(ed25519::PrivateKey::from_seed(i as u64));
718            }
719            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
720            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
721            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
722
723            // Create network for peer 0
724            let signer0 = peers[0].clone();
725            let config0 = Config::test(
726                signer0.clone(),
727                socket0,
728                vec![(peers[1].public_key(), socket1.into())],
729                1_024 * 1_024, // 1MB
730            );
731            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
732            oracle0
733                .update(0, addresses.clone().try_into().unwrap())
734                .await;
735            let (mut sender0, _receiver0) =
736                network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
737            network0.start();
738
739            // Create network for peer 1
740            let signer1 = peers[1].clone();
741            let config1 = Config::test(
742                signer1.clone(),
743                socket1,
744                vec![(peers[0].public_key(), socket0.into())],
745                1_024 * 1_024, // 1MB
746            );
747            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
748            oracle1
749                .update(0, addresses.clone().try_into().unwrap())
750                .await;
751            let (_sender1, _receiver1) =
752                network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
753            network1.start();
754
755            // Send first message, which should be allowed and consume the quota.
756            let msg = vec![0u8; 1024]; // 1KB
757            loop {
758                // Confirm message is sent to peer
759                let sent = sender0
760                    .send(Recipients::One(addresses[1].clone()), &msg[..], true)
761                    .await
762                    .unwrap();
763                if !sent.is_empty() {
764                    break;
765                }
766
767                // Ensure we don't rate limit outbound sends while
768                // waiting for peers to connect
769                context.sleep(Duration::from_mins(1)).await
770            }
771
772            // Immediately send the second message to trigger the rate limit.
773            // With partial sends, rate-limited recipients return empty vec (not error).
774            // Outbound rate limiting skips the peer, returns empty vec.
775            let sent = sender0
776                .send(Recipients::One(addresses[1].clone()), &msg[..], true)
777                .await
778                .unwrap();
779            assert!(sent.is_empty());
780
781            // Give the metrics time to reflect the rate-limited message.
782            for _ in 0..10 {
783                assert_no_rate_limiting(&context);
784                context.sleep(Duration::from_millis(100)).await;
785            }
786        });
787    }
788
789    #[test_traced]
790    fn test_unordered_peer_sets() {
791        let (n, base_port) = (10, 3000);
792        let executor = deterministic::Runner::default();
793        executor.start(|context| async move {
794            // Create peers
795            let mut peers_and_sks = Vec::new();
796            for i in 0..n {
797                let sk = ed25519::PrivateKey::from_seed(i as u64);
798                let pk = sk.public_key();
799                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
800                peers_and_sks.push((sk, pk, addr));
801            }
802            let peer0 = peers_and_sks[0].clone();
803            let config = Config::test(
804                peer0.0,
805                peer0.2,
806                vec![(peer0.1.clone(), peer0.2.into())],
807                1_024 * 1_024,
808            );
809            let (network, mut oracle) = Network::new(context.with_label("network"), config);
810            network.start();
811
812            // Subscribe to peer sets
813            let mut subscription = oracle.subscribe().await;
814
815            // Register initial peer set
816            let set10: Set<_> = peers_and_sks
817                .iter()
818                .take(2)
819                .map(|(_, pk, _)| pk.clone())
820                .try_collect()
821                .unwrap();
822            oracle.update(10, set10.clone()).await;
823            let (id, new, all) = subscription.next().await.unwrap();
824            assert_eq!(id, 10);
825            assert_eq!(new, set10);
826            assert_eq!(all, set10);
827
828            // Register old peer sets (ignored)
829            let set9: Set<_> = peers_and_sks
830                .iter()
831                .skip(2)
832                .map(|(_, pk, _)| pk.clone())
833                .try_collect()
834                .unwrap();
835            oracle.update(9, set9.clone()).await;
836
837            // Add new peer set
838            let set11: Set<_> = peers_and_sks
839                .iter()
840                .skip(4)
841                .map(|(_, pk, _)| pk.clone())
842                .try_collect()
843                .unwrap();
844            oracle.update(11, set11.clone()).await;
845            let (id, new, all) = subscription.next().await.unwrap();
846            assert_eq!(id, 11);
847            assert_eq!(new, set11);
848            let all_keys: Set<_> = set10
849                .into_iter()
850                .chain(set11.into_iter())
851                .try_collect()
852                .unwrap();
853            assert_eq!(all, all_keys);
854        });
855    }
856
857    #[test_traced]
858    fn test_graceful_shutdown() {
859        let base_port = 3000;
860        let n: usize = 5;
861
862        let executor = deterministic::Runner::default();
863        executor.start(|context| async move {
864            // Create peers
865            let mut peers = Vec::new();
866            for i in 0..n {
867                peers.push(ed25519::PrivateKey::from_seed(i as u64));
868            }
869            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
870
871            // Create networks for all peers
872            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
873            for (i, peer) in peers.iter().enumerate() {
874                let peer_context = context.with_label(&format!("peer_{i}"));
875                let port = base_port + i as u16;
876
877                // Create bootstrappers (everyone connects to peer 0)
878                let mut bootstrappers = Vec::new();
879                if i > 0 {
880                    bootstrappers.push((
881                        addresses[0].clone(),
882                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
883                    ));
884                }
885
886                let signer = peer.clone();
887                let config = Config::test(
888                    signer.clone(),
889                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
890                    bootstrappers,
891                    1_024 * 1_024, // 1MB
892                );
893                let (mut network, mut oracle) =
894                    Network::new(peer_context.with_label("network"), config);
895
896                // Register peer set
897                oracle
898                    .update(0, addresses.clone().try_into().unwrap())
899                    .await;
900
901                let (mut sender, mut receiver) =
902                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
903                network.start();
904
905                peer_context.with_label("agent").spawn({
906                    let mut complete_sender = complete_sender.clone();
907                    move |context| async move {
908                        // Wait to connect to at least one other peer (except for peer 0 which is the bootstrapper)
909                        let expected_connections = if i == 0 { n - 1 } else { 1 };
910
911                        // Send a message
912                        let msg = signer.public_key();
913                        loop {
914                            let sent = sender
915                                .send(Recipients::All, msg.as_ref(), true)
916                                .await
917                                .unwrap();
918                            if sent.len() >= expected_connections {
919                                break;
920                            }
921                            context.sleep(Duration::from_millis(100)).await;
922                        }
923
924                        // Signal that this peer is connected
925                        complete_sender.send(()).await.unwrap();
926
927                        // Keep receiving messages until shutdown
928                        select_loop! {
929                            context,
930                            on_stopped => {},
931                            result = receiver.recv() => {
932                                if result.is_err() {
933                                    // Channel closed due to shutdown
934                                    break;
935                                }
936                            }
937                        }
938                    }
939                });
940            }
941
942            // Wait for all peers to establish connectivity
943            for _ in 0..n {
944                complete_receiver.next().await.unwrap();
945            }
946
947            // Verify that network actors started for all peers
948            let metrics_before = context.encode();
949            let is_running = |name: &str| -> bool {
950                metrics_before.lines().any(|line| {
951                    line.starts_with("runtime_tasks_running{")
952                        && line.contains(&format!("name=\"{name}\""))
953                        && line.contains("kind=\"Task\"")
954                        && line.trim_end().ends_with(" 1")
955                })
956            };
957            for i in 0..n {
958                let prefix = format!("peer_{i}_network");
959                assert!(
960                    is_running(&format!("{prefix}_tracker")),
961                    "peer_{i} tracker should be running"
962                );
963                assert!(
964                    is_running(&format!("{prefix}_router")),
965                    "peer_{i} router should be running"
966                );
967                assert!(
968                    is_running(&format!("{prefix}_spawner")),
969                    "peer_{i} spawner should be running"
970                );
971                assert!(
972                    is_running(&format!("{prefix}_listener")),
973                    "peer_{i} listener should be running"
974                );
975                assert!(
976                    is_running(&format!("{prefix}_dialer")),
977                    "peer_{i} dialer should be running"
978                );
979            }
980
981            // All peers are connected - now trigger graceful shutdown
982            // by stopping the context
983            let shutdown_context = context.clone();
984            context.with_label("shutdown").spawn(move |_| async move {
985                // Trigger graceful shutdown
986                let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
987
988                // Shutdown should complete successfully without timeout
989                assert!(
990                    result.is_ok(),
991                    "graceful shutdown should complete: {result:?}"
992                );
993            });
994
995            // Wait for shutdown to complete
996            context.stopped().await.unwrap();
997
998            // Give the runtime a tick to process task completions and update metrics
999            context.sleep(Duration::from_millis(100)).await;
1000
1001            // Verify that all network actors stopped
1002            let metrics_after = context.encode();
1003            let is_stopped = |name: &str| -> bool {
1004                metrics_after.lines().any(|line| {
1005                    line.starts_with("runtime_tasks_running{")
1006                        && line.contains(&format!("name=\"{name}\""))
1007                        && line.contains("kind=\"Task\"")
1008                        && line.trim_end().ends_with(" 0")
1009                })
1010            };
1011            for i in 0..n {
1012                let prefix = format!("peer_{i}_network");
1013                assert!(
1014                    is_stopped(&format!("{prefix}_tracker")),
1015                    "peer_{i} tracker should be stopped"
1016                );
1017                assert!(
1018                    is_stopped(&format!("{prefix}_router")),
1019                    "peer_{i} router should be stopped"
1020                );
1021                assert!(
1022                    is_stopped(&format!("{prefix}_spawner")),
1023                    "peer_{i} spawner should be stopped"
1024                );
1025                assert!(
1026                    is_stopped(&format!("{prefix}_listener")),
1027                    "peer_{i} listener should be stopped"
1028                );
1029                assert!(
1030                    is_stopped(&format!("{prefix}_dialer")),
1031                    "peer_{i} dialer should be stopped"
1032                );
1033            }
1034        });
1035    }
1036
1037    #[test_traced]
1038    fn test_subscription_includes_self_when_registered() {
1039        let base_port = 3000;
1040        let executor = deterministic::Runner::default();
1041        executor.start(|context| async move {
1042            // Create self (peer0) and other peers
1043            let self_sk = ed25519::PrivateKey::from_seed(0);
1044            let self_pk = self_sk.public_key();
1045            let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1046
1047            let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
1048
1049            // Create network for peer0 (self)
1050            let config = Config::test(
1051                self_sk,
1052                self_addr,
1053                vec![], // No bootstrappers
1054                1_024 * 1_024,
1055            );
1056            let (network, mut oracle) = Network::new(context.with_label("network"), config);
1057            network.start();
1058
1059            // Subscribe to peer sets
1060            let mut subscription = oracle.subscribe().await;
1061
1062            // Register a peer set that does NOT include self
1063            let peer_set: Set<_> = [other_pk.clone()].try_into().unwrap();
1064            oracle.update(1, peer_set.clone()).await;
1065
1066            // Receive subscription notification
1067            let (id, new, all) = subscription.next().await.unwrap();
1068            assert_eq!(id, 1);
1069            assert_eq!(new.len(), 1);
1070            assert_eq!(all.len(), 1);
1071
1072            // Self should NOT be in the new set
1073            assert!(
1074                new.position(&self_pk).is_none(),
1075                "new set should not include self"
1076            );
1077            assert!(
1078                new.position(&other_pk).is_some(),
1079                "new set should include other"
1080            );
1081
1082            // Self should NOT be in the tracked set (not registered)
1083            assert!(
1084                all.position(&self_pk).is_none(),
1085                "tracked peers should not include self"
1086            );
1087            assert!(
1088                all.position(&other_pk).is_some(),
1089                "tracked peers should include other"
1090            );
1091
1092            // Now register a peer set that DOES include self
1093            let peer_set: Set<_> = [self_pk.clone(), other_pk.clone()].try_into().unwrap();
1094            oracle.update(2, peer_set.clone()).await;
1095
1096            // Receive subscription notification
1097            let (id, new, all) = subscription.next().await.unwrap();
1098            assert_eq!(id, 2);
1099            assert_eq!(new.len(), 2);
1100            assert_eq!(all.len(), 2);
1101
1102            // Both peers should be in the new set
1103            assert!(
1104                new.position(&self_pk).is_some(),
1105                "new set should include self"
1106            );
1107            assert!(
1108                new.position(&other_pk).is_some(),
1109                "new set should include other"
1110            );
1111
1112            // Both peers should be in the tracked set
1113            assert!(
1114                all.position(&self_pk).is_some(),
1115                "tracked peers should include self"
1116            );
1117            assert!(
1118                all.position(&other_pk).is_some(),
1119                "tracked peers should include other"
1120            );
1121        });
1122    }
1123
1124    #[test_traced]
1125    fn test_dns_bootstrapper_resolution() {
1126        let base_port = 3000;
1127        let n: usize = 3;
1128
1129        let executor = deterministic::Runner::default();
1130        executor.start(|context| async move {
1131            // Create peers
1132            let mut peers = Vec::new();
1133            for i in 0..n {
1134                peers.push(ed25519::PrivateKey::from_seed(i as u64));
1135            }
1136            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1137
1138            // Register DNS mapping for the bootstrapper (peer 0)
1139            let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1140            context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1141
1142            // Create networks
1143            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1144            for (i, peer) in peers.iter().enumerate() {
1145                let context = context.with_label(&format!("peer_{i}"));
1146                let port = base_port + i as u16;
1147
1148                // Create bootstrappers - use DNS for non-zero peers
1149                let bootstrappers = if i > 0 {
1150                    vec![(
1151                        addresses[0].clone(),
1152                        Ingress::Dns {
1153                            host: hostname!("boot.local"),
1154                            port: base_port,
1155                        },
1156                    )]
1157                } else {
1158                    vec![]
1159                };
1160
1161                // Create network
1162                let signer = peer.clone();
1163                let config = Config::test(
1164                    signer.clone(),
1165                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1166                    bootstrappers,
1167                    1_024 * 1_024,
1168                );
1169                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1170
1171                // Register peers
1172                oracle
1173                    .update(0, addresses.clone().try_into().unwrap())
1174                    .await;
1175
1176                // Register channel
1177                let (mut sender, mut receiver) =
1178                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1179
1180                network.start();
1181
1182                // Send/Receive messages
1183                context.with_label("agent").spawn({
1184                    let mut complete_sender = complete_sender.clone();
1185                    let addresses = addresses.clone();
1186                    move |context| async move {
1187                        // Wait for messages from other peers
1188                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1189                            let mut received = HashSet::new();
1190                            while received.len() < n - 1 {
1191                                let (sender, message) = receiver.recv().await.unwrap();
1192                                assert_eq!(sender.as_ref(), message.as_ref());
1193                                received.insert(sender);
1194                            }
1195                            complete_sender.send(()).await.unwrap();
1196
1197                            loop {
1198                                receiver.recv().await.unwrap();
1199                            }
1200                        });
1201
1202                        // Send identity to all peers
1203                        let msg = signer.public_key();
1204                        let sender =
1205                            context
1206                                .with_label("sender")
1207                                .spawn(move |context| async move {
1208                                    loop {
1209                                        let mut recipients = addresses.clone();
1210                                        recipients.remove(i);
1211                                        recipients.sort();
1212
1213                                        loop {
1214                                            let mut sent = sender
1215                                                .send(Recipients::All, msg.as_ref(), true)
1216                                                .await
1217                                                .unwrap();
1218                                            if sent.len() != n - 1 {
1219                                                context.sleep(Duration::from_millis(100)).await;
1220                                                continue;
1221                                            }
1222                                            sent.sort();
1223                                            assert_eq!(sent, recipients);
1224                                            break;
1225                                        }
1226
1227                                        context.sleep(Duration::from_secs(10)).await;
1228                                    }
1229                                });
1230
1231                        select! {
1232                            receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1233                            sender = sender => { panic!("sender exited: {sender:?}") },
1234                        }
1235                    }
1236                });
1237            }
1238
1239            // Wait for all peers to exchange messages
1240            for _ in 0..n {
1241                complete_receiver.next().await.unwrap();
1242            }
1243
1244            assert_no_rate_limiting(&context);
1245        });
1246    }
1247
1248    #[test_traced]
1249    fn test_dns_resolution_failure_then_success() {
1250        let base_port = 3100;
1251
1252        let executor = deterministic::Runner::default();
1253        executor.start(|context| async move {
1254            // Create 2 peers
1255            let peer0 = ed25519::PrivateKey::from_seed(0);
1256            let peer1 = ed25519::PrivateKey::from_seed(1);
1257            let addresses = vec![peer0.public_key(), peer1.public_key()];
1258
1259            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1260            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1261
1262            // Do NOT register DNS mapping initially - peer 1 will fail to resolve
1263
1264            // Create network for peer 0 (bootstrapper, no DNS)
1265            let config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1266            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1267            oracle0
1268                .update(0, addresses.clone().try_into().unwrap())
1269                .await;
1270            let (mut sender0, mut receiver0) =
1271                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1272            network0.start();
1273
1274            // Create network for peer 1 with DNS bootstrapper
1275            let config1 = Config::test(
1276                peer1.clone(),
1277                socket1,
1278                vec![(
1279                    peer0.public_key(),
1280                    Ingress::Dns {
1281                        host: hostname!("boot.local"),
1282                        port: base_port,
1283                    },
1284                )],
1285                1_024 * 1_024,
1286            );
1287            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1288            oracle1
1289                .update(0, addresses.clone().try_into().unwrap())
1290                .await;
1291            let (mut sender1, mut receiver1) =
1292                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1293            network1.start();
1294
1295            // Wait a bit - peer 1 should fail to connect (DNS not registered)
1296            context.sleep(Duration::from_secs(2)).await;
1297
1298            // Verify peer 0 cannot send to peer 1 yet
1299            let sent = sender0
1300                .send(Recipients::One(peer1.public_key()), b"test".as_ref(), true)
1301                .await
1302                .unwrap();
1303            assert!(sent.is_empty(), "should not be connected yet");
1304
1305            // Now register the DNS mapping
1306            context.resolver_register("boot.local", Some(vec![IpAddr::V4(Ipv4Addr::LOCALHOST)]));
1307
1308            // Wait for peer 1 to connect via DNS resolution
1309            let pk0 = peer0.public_key();
1310            let pk1 = peer1.public_key();
1311            let msg0 = pk0.to_vec();
1312            let msg1 = pk1.to_vec();
1313
1314            // Spawn receiver tasks
1315            let (done_sender, mut done_receiver) = mpsc::channel::<()>(2);
1316            let mut done0 = done_sender.clone();
1317            let pk1_clone = pk1.clone();
1318            context.with_label("recv0").spawn(move |_| async move {
1319                let (sender, message) = receiver0.recv().await.unwrap();
1320                assert_eq!(sender, pk1_clone);
1321                assert_eq!(message.as_ref(), msg1.as_slice());
1322                done0.send(()).await.unwrap();
1323            });
1324            let mut done1 = done_sender.clone();
1325            let pk0_clone = pk0.clone();
1326            context.with_label("recv1").spawn(move |_| async move {
1327                let (sender, message) = receiver1.recv().await.unwrap();
1328                assert_eq!(sender, pk0_clone);
1329                assert_eq!(message.as_ref(), msg0.as_slice());
1330                done1.send(()).await.unwrap();
1331            });
1332
1333            // Send messages until both peers connected
1334            context.with_label("sender").spawn({
1335                let pk0 = pk0.clone();
1336                let pk1 = pk1.clone();
1337                move |context| async move {
1338                    loop {
1339                        let sent0 = sender0
1340                            .send(Recipients::One(pk1.clone()), pk0.as_ref(), true)
1341                            .await
1342                            .unwrap();
1343                        let sent1 = sender1
1344                            .send(Recipients::One(pk0.clone()), pk1.as_ref(), true)
1345                            .await
1346                            .unwrap();
1347                        if !sent0.is_empty() && !sent1.is_empty() {
1348                            break;
1349                        }
1350                        context.sleep(Duration::from_millis(100)).await;
1351                    }
1352                }
1353            });
1354
1355            // Wait for both receivers to get messages
1356            done_receiver.next().await.unwrap();
1357            done_receiver.next().await.unwrap();
1358        });
1359    }
1360
1361    /// Helper to run DNS connectivity test with a specific seed and return auditor state.
1362    fn run_dns_connectivity(seed: u64) -> String {
1363        let base_port = 3400;
1364        let n: usize = 3;
1365
1366        let executor = deterministic::Runner::seeded(seed);
1367        executor.start(|context| async move {
1368            // Create peers
1369            let mut peers = Vec::new();
1370            for i in 0..n {
1371                peers.push(ed25519::PrivateKey::from_seed(i as u64));
1372            }
1373            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1374
1375            // Register DNS mappings
1376            let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1377            context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1378
1379            // Create networks
1380            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1381            for (i, peer) in peers.iter().enumerate() {
1382                let context = context.with_label(&format!("peer_{i}"));
1383                let port = base_port + i as u16;
1384
1385                // Use DNS for bootstrapper
1386                let bootstrappers = if i > 0 {
1387                    vec![(
1388                        addresses[0].clone(),
1389                        Ingress::Dns {
1390                            host: hostname!("boot.local"),
1391                            port: base_port,
1392                        },
1393                    )]
1394                } else {
1395                    vec![]
1396                };
1397
1398                let signer = peer.clone();
1399                let config = Config::test(
1400                    signer.clone(),
1401                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1402                    bootstrappers,
1403                    1_024 * 1_024,
1404                );
1405                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1406                oracle
1407                    .update(0, addresses.clone().try_into().unwrap())
1408                    .await;
1409                let (mut sender, mut receiver) =
1410                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1411                network.start();
1412
1413                context.with_label("agent").spawn({
1414                    let mut complete_sender = complete_sender.clone();
1415                    let addresses = addresses.clone();
1416                    move |context| async move {
1417                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1418                            let mut received = HashSet::new();
1419                            while received.len() < n - 1 {
1420                                let (sender, message) = receiver.recv().await.unwrap();
1421                                assert_eq!(sender.as_ref(), message.as_ref());
1422                                received.insert(sender);
1423                            }
1424                            complete_sender.send(()).await.unwrap();
1425                            loop {
1426                                receiver.recv().await.unwrap();
1427                            }
1428                        });
1429
1430                        let msg = signer.public_key();
1431                        let sender =
1432                            context
1433                                .with_label("sender")
1434                                .spawn(move |context| async move {
1435                                    loop {
1436                                        let mut recipients = addresses.clone();
1437                                        recipients.remove(i);
1438                                        recipients.sort();
1439
1440                                        loop {
1441                                            let mut sent = sender
1442                                                .send(Recipients::All, msg.as_ref(), true)
1443                                                .await
1444                                                .unwrap();
1445                                            if sent.len() != n - 1 {
1446                                                context.sleep(Duration::from_millis(100)).await;
1447                                                continue;
1448                                            }
1449                                            sent.sort();
1450                                            assert_eq!(sent, recipients);
1451                                            break;
1452                                        }
1453
1454                                        context.sleep(Duration::from_secs(10)).await;
1455                                    }
1456                                });
1457
1458                        select! {
1459                            receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1460                            sender = sender => { panic!("sender exited: {sender:?}") },
1461                        }
1462                    }
1463                });
1464            }
1465
1466            for _ in 0..n {
1467                complete_receiver.next().await.unwrap();
1468            }
1469
1470            context.auditor().state()
1471        })
1472    }
1473
1474    #[test_traced]
1475    fn test_dns_resolution_determinism() {
1476        // Run same test twice with same seed
1477        let state1 = run_dns_connectivity(42);
1478        let state2 = run_dns_connectivity(42);
1479        assert_eq!(state1, state2, "DNS resolution should be deterministic");
1480    }
1481
1482    #[test_traced]
1483    fn test_dns_resolving_to_private_ip_not_dialed() {
1484        // Test that when allow_private_ips=false, DNS addresses that resolve
1485        // to private IPs are not dialed.
1486        let base_port = 3300;
1487        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1488        executor.start(|context| async move {
1489            let peer0 = ed25519::PrivateKey::from_seed(0);
1490            let peer1 = ed25519::PrivateKey::from_seed(1);
1491
1492            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1493            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1494
1495            // Register DNS mapping that resolves to localhost (private IP)
1496            context.resolver_register("boot.local".to_string(), Some(vec![socket0.ip()]));
1497
1498            let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1499
1500            // Create peer 0 (bootstrapper) with allow_private_ips=true
1501            let mut config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1502            config0.allow_private_ips = true;
1503            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1504            oracle0
1505                .update(0, addresses.clone().try_into().unwrap())
1506                .await;
1507            let (_sender0, mut receiver0) =
1508                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1509            network0.start();
1510
1511            // Create peer 1 with allow_private_ips=false using DNS bootstrapper
1512            let bootstrappers = vec![(
1513                peer0.public_key(),
1514                Ingress::Dns {
1515                    host: hostname!("boot.local"),
1516                    port: base_port,
1517                },
1518            )];
1519            let mut config1 = Config::test(peer1.clone(), socket1, bootstrappers, 1_024 * 1_024);
1520            config1.allow_private_ips = false; // This should prevent dialing the private IP
1521            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1522            oracle1
1523                .update(0, addresses.clone().try_into().unwrap())
1524                .await;
1525            let (mut sender1, _receiver1) =
1526                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1527            network1.start();
1528
1529            // Wait for a period during which peer 1 would normally connect
1530            context.sleep(Duration::from_secs(5)).await;
1531
1532            // Try to send from peer 1 - should not reach anyone since private IPs are blocked
1533            let sent = sender1
1534                .send(Recipients::All, peer1.public_key().as_ref(), true)
1535                .await
1536                .unwrap();
1537            assert!(
1538                sent.is_empty(),
1539                "peer 1 should not have connected to peer 0 (private IP)"
1540            );
1541
1542            // Verify peer 0 received nothing from peer 1
1543            select! {
1544                msg = receiver0.recv() => {
1545                    panic!("peer 0 should not have received any message, got: {msg:?}");
1546                },
1547                _ = context.sleep(Duration::from_secs(1)) => {
1548                    // Expected: timeout with no message
1549                }
1550            }
1551        });
1552    }
1553
1554    #[test_traced]
1555    fn test_dns_mixed_ips_connectivity() {
1556        // Test that peers can connect even when DNS resolves to multiple IPs
1557        // where most are unreachable. The dialer randomly picks an IP, so
1558        // eventually it should pick the reachable one.
1559        //
1560        // Run over 25 seeds to ensure we exercise the random IP selection.
1561        for seed in 0..25 {
1562            let base_port = 3400;
1563
1564            let cfg = deterministic::Config::default()
1565                .with_seed(seed)
1566                .with_timeout(Some(Duration::from_secs(120)));
1567            let executor = deterministic::Runner::new(cfg);
1568            executor.start(|context| async move {
1569                let peer0 = ed25519::PrivateKey::from_seed(0);
1570                let peer1 = ed25519::PrivateKey::from_seed(1);
1571
1572                let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1573                let socket0 = SocketAddr::new(good_ip, base_port);
1574                let socket1 = SocketAddr::new(good_ip, base_port + 1);
1575
1576                // Register DNS mappings with 3 bad IPs and 1 good IP for both peers
1577                let mut all_ips0: Vec<IpAddr> = (1..=3)
1578                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1579                    .collect();
1580                all_ips0.push(good_ip);
1581                context.resolver_register("peer-0.local", Some(all_ips0));
1582
1583                let mut all_ips1: Vec<IpAddr> = (1..=3)
1584                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1585                    .collect();
1586                all_ips1.push(good_ip);
1587                context.resolver_register("peer-1.local", Some(all_ips1));
1588
1589                let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1590
1591                // Create peer 0 with peer 1 as DNS bootstrapper
1592                let bootstrappers0 = vec![(
1593                    peer1.public_key(),
1594                    Ingress::Dns {
1595                        host: hostname!("peer-1.local"),
1596                        port: base_port + 1,
1597                    },
1598                )];
1599                let config0 = Config::test(peer0.clone(), socket0, bootstrappers0, 1_024 * 1_024);
1600                let (mut network0, mut oracle0) =
1601                    Network::new(context.with_label("peer_0"), config0);
1602                oracle0
1603                    .update(0, addresses.clone().try_into().unwrap())
1604                    .await;
1605                let (_sender0, mut receiver0) =
1606                    network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1607                network0.start();
1608
1609                // Create peer 1 with peer 0 as DNS bootstrapper
1610                let bootstrappers1 = vec![(
1611                    peer0.public_key(),
1612                    Ingress::Dns {
1613                        host: hostname!("peer-0.local"),
1614                        port: base_port,
1615                    },
1616                )];
1617                let config1 = Config::test(peer1.clone(), socket1, bootstrappers1, 1_024 * 1_024);
1618                let (mut network1, mut oracle1) =
1619                    Network::new(context.with_label("peer_1"), config1);
1620                oracle1
1621                    .update(0, addresses.clone().try_into().unwrap())
1622                    .await;
1623                let (mut sender1, _receiver1) =
1624                    network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1625                network1.start();
1626
1627                // Wait for peers to connect (may take multiple attempts due to random IP selection)
1628                let pk0 = peer0.public_key();
1629                loop {
1630                    let sent = sender1
1631                        .send(
1632                            Recipients::One(pk0.clone()),
1633                            peer1.public_key().as_ref(),
1634                            true,
1635                        )
1636                        .await
1637                        .unwrap();
1638                    if !sent.is_empty() {
1639                        break;
1640                    }
1641                    context.sleep(Duration::from_millis(100)).await;
1642                }
1643
1644                // Verify peer 0 received the message
1645                let (sender, msg) = receiver0.recv().await.unwrap();
1646                assert_eq!(sender, peer1.public_key());
1647                assert_eq!(msg.as_ref(), peer1.public_key().as_ref());
1648            });
1649        }
1650    }
1651
1652    #[test_traced]
1653    fn test_many_peer_restart_with_new_address() {
1654        let base_port = 7500;
1655        let n = 5;
1656
1657        let executor = deterministic::Runner::default();
1658        executor.start(|context| async move {
1659            // Create peers
1660            let peers: Vec<_> = (0..n)
1661                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1662                .collect();
1663            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1664
1665            // Track senders/receivers/handles across restarts
1666            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1667            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1668            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1669
1670            // Track port allocations (updated on restart)
1671            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1672
1673            // Create networks for all peers (peer 0 is bootstrapper)
1674            for (i, peer) in peers.iter().enumerate() {
1675                let peer_context = context.with_label(&format!("peer_{i}"));
1676
1677                // Non-bootstrapper peers point to peer 0
1678                let mut bootstrappers = Vec::new();
1679                if i > 0 {
1680                    bootstrappers.push((
1681                        addresses[0].clone(),
1682                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1683                    ));
1684                }
1685
1686                let config = Config::test(
1687                    peer.clone(),
1688                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1689                    bootstrappers,
1690                    MAX_MESSAGE_SIZE,
1691                );
1692                let (mut network, mut oracle) =
1693                    Network::new(peer_context.with_label("network"), config);
1694
1695                // Register peer set
1696                oracle
1697                    .update(0, addresses.clone().try_into().unwrap())
1698                    .await;
1699
1700                let (sender, receiver) =
1701                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1702                senders[i] = Some(sender);
1703                receivers[i] = Some(receiver);
1704
1705                let handle = network.start();
1706                handles[i] = Some(handle);
1707            }
1708
1709            // Wait for full connectivity (each peer can send to all others)
1710            for (i, sender) in senders.iter_mut().enumerate() {
1711                let sender = sender.as_mut().unwrap();
1712                loop {
1713                    let sent = sender
1714                        .send(Recipients::All, peers[i].public_key().as_ref(), true)
1715                        .await
1716                        .unwrap();
1717                    if sent.len() == n - 1 {
1718                        break;
1719                    }
1720                    context.sleep(Duration::from_millis(100)).await;
1721                }
1722            }
1723
1724            // Verify each peer can receive from all others
1725            for receiver in receivers.iter_mut() {
1726                let receiver = receiver.as_mut().unwrap();
1727                let mut received = HashSet::new();
1728                while received.len() < n - 1 {
1729                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1730                    assert_eq!(sender.as_ref(), message.as_ref());
1731                    received.insert(sender);
1732                }
1733            }
1734
1735            // Restart each non-first peer with a new port, multiple rounds
1736            let mut restart_counter = 0u16;
1737            for round in 0..3 {
1738                for restart_peer_idx in 1..n {
1739                    // Allocate a new unique port
1740                    restart_counter += 1;
1741                    let new_port = base_port + 100 + restart_counter;
1742                    ports[restart_peer_idx] = new_port;
1743
1744                    // Abort the peer's network
1745                    if let Some(handle) = handles[restart_peer_idx].take() {
1746                        handle.abort();
1747                    }
1748                    senders[restart_peer_idx] = None;
1749                    receivers[restart_peer_idx] = None;
1750
1751                    // Restart the peer with new port (uses bootstrapper for discovery)
1752                    let peer_context =
1753                        context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1754                    let bootstrappers = vec![(
1755                        addresses[0].clone(),
1756                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1757                    )];
1758                    let config = Config::test(
1759                        peers[restart_peer_idx].clone(),
1760                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1761                        bootstrappers,
1762                        MAX_MESSAGE_SIZE,
1763                    );
1764                    let (mut network, mut oracle) =
1765                        Network::new(peer_context.with_label("network"), config);
1766
1767                    // Register peer set
1768                    oracle
1769                        .update(0, addresses.clone().try_into().unwrap())
1770                        .await;
1771
1772                    let (sender, receiver) = network.register(
1773                        0,
1774                        Quota::per_second(NZU32!(100)),
1775                        DEFAULT_MESSAGE_BACKLOG,
1776                    );
1777                    senders[restart_peer_idx] = Some(sender);
1778                    receivers[restart_peer_idx] = Some(receiver);
1779
1780                    let handle = network.start();
1781                    handles[restart_peer_idx] = Some(handle);
1782
1783                    // Wait for the restarted peer to reconnect to all others
1784                    let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1785                    loop {
1786                        let sent = restarted_sender
1787                            .send(
1788                                Recipients::All,
1789                                peers[restart_peer_idx].public_key().as_ref(),
1790                                true,
1791                            )
1792                            .await
1793                            .unwrap();
1794                        if sent.len() == n - 1 {
1795                            break;
1796                        }
1797                        context.sleep(Duration::from_millis(100)).await;
1798                    }
1799
1800                    // Verify other peers can send to the restarted peer
1801                    for i in 0..n {
1802                        if i == restart_peer_idx {
1803                            continue;
1804                        }
1805                        let sender = senders[i].as_mut().unwrap();
1806                        loop {
1807                            let sent = sender
1808                                .send(
1809                                    Recipients::One(addresses[restart_peer_idx].clone()),
1810                                    peers[i].public_key().as_ref(),
1811                                    true,
1812                                )
1813                                .await
1814                                .unwrap();
1815                            if sent.len() == 1 {
1816                                break;
1817                            }
1818                            context.sleep(Duration::from_millis(100)).await;
1819                        }
1820                    }
1821
1822                    // Verify the restarted peer receives messages from all others
1823                    let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1824                    let mut received = HashSet::new();
1825                    while received.len() < n - 1 {
1826                        let (sender, message): (ed25519::PublicKey, _) =
1827                            restarted_receiver.recv().await.unwrap();
1828                        assert_eq!(sender.as_ref(), message.as_ref());
1829                        received.insert(sender);
1830                    }
1831                }
1832            }
1833
1834            assert_no_rate_limiting(&context);
1835        });
1836    }
1837
1838    #[test_traced]
1839    fn test_simultaneous_peer_restart() {
1840        let base_port = 7700;
1841        let n = 5;
1842
1843        let executor = deterministic::Runner::default();
1844        executor.start(|context| async move {
1845            // Create peers
1846            let peers: Vec<_> = (0..n)
1847                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1848                .collect();
1849            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1850
1851            // Track port allocations (updated on restart)
1852            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1853
1854            // Track senders/receivers/handles across restarts
1855            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1856            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1857            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1858
1859            // Create networks for all peers (peer 0 is bootstrapper)
1860            for (i, peer) in peers.iter().enumerate() {
1861                let peer_context = context.with_label(&format!("peer_{i}"));
1862
1863                // Non-bootstrapper peers point to peer 0
1864                let mut bootstrappers = Vec::new();
1865                if i > 0 {
1866                    bootstrappers.push((
1867                        addresses[0].clone(),
1868                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1869                    ));
1870                }
1871
1872                let config = Config::test(
1873                    peer.clone(),
1874                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1875                    bootstrappers,
1876                    MAX_MESSAGE_SIZE,
1877                );
1878                let (mut network, mut oracle) =
1879                    Network::new(peer_context.with_label("network"), config);
1880
1881                // Register peer set
1882                oracle
1883                    .update(0, addresses.clone().try_into().unwrap())
1884                    .await;
1885
1886                let (sender, receiver) =
1887                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1888                senders[i] = Some(sender);
1889                receivers[i] = Some(receiver);
1890
1891                let handle = network.start();
1892                handles[i] = Some(handle);
1893            }
1894
1895            // Wait for full connectivity (each peer can send to all others)
1896            for (i, sender) in senders.iter_mut().enumerate() {
1897                let sender = sender.as_mut().unwrap();
1898                loop {
1899                    let sent = sender
1900                        .send(Recipients::All, peers[i].public_key().as_ref(), true)
1901                        .await
1902                        .unwrap();
1903                    if sent.len() == n - 1 {
1904                        break;
1905                    }
1906                    context.sleep(Duration::from_millis(100)).await;
1907                }
1908            }
1909
1910            // Verify each peer can receive from all others
1911            for receiver in receivers.iter_mut() {
1912                let receiver = receiver.as_mut().unwrap();
1913                let mut received = HashSet::new();
1914                while received.len() < n - 1 {
1915                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1916                    assert_eq!(sender.as_ref(), message.as_ref());
1917                    received.insert(sender);
1918                }
1919            }
1920
1921            // Shutdown all non-bootstrapper peers simultaneously.
1922            //
1923            // We keep the bootstrapper (peer 0) alive to exercise the case
1924            // where multiple peers churn at once.
1925            let restart_peers: Vec<usize> = (1..n).collect();
1926            for &idx in &restart_peers {
1927                if let Some(handle) = handles[idx].take() {
1928                    handle.abort();
1929                }
1930                senders[idx] = None;
1931                receivers[idx] = None;
1932                // Allocate new ports for restarted peers
1933                ports[idx] = base_port + 100 + idx as u16;
1934            }
1935
1936            // Wait for connections to be detected as closed
1937            context.sleep(Duration::from_secs(2)).await;
1938
1939            // Restart all peers with new ports (uses bootstrapper for discovery)
1940            for &idx in &restart_peers {
1941                let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1942                let bootstrappers = vec![(
1943                    addresses[0].clone(),
1944                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1945                )];
1946                let config = Config::test(
1947                    peers[idx].clone(),
1948                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1949                    bootstrappers,
1950                    MAX_MESSAGE_SIZE,
1951                );
1952                let (mut network, mut oracle) =
1953                    Network::new(peer_context.with_label("network"), config);
1954
1955                // Register peer set
1956                oracle
1957                    .update(0, addresses.clone().try_into().unwrap())
1958                    .await;
1959
1960                let (sender, receiver) =
1961                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1962                senders[idx] = Some(sender);
1963                receivers[idx] = Some(receiver);
1964
1965                let handle = network.start();
1966                handles[idx] = Some(handle);
1967            }
1968
1969            // Wait for full connectivity after restart
1970            for (i, sender) in senders.iter_mut().enumerate() {
1971                let sender = sender.as_mut().unwrap();
1972                loop {
1973                    let sent = sender
1974                        .send(Recipients::All, peers[i].public_key().as_ref(), true)
1975                        .await
1976                        .unwrap();
1977                    if sent.len() == n - 1 {
1978                        break;
1979                    }
1980                    context.sleep(Duration::from_millis(100)).await;
1981                }
1982            }
1983
1984            // Verify each peer can receive from all others after restart
1985            for receiver in receivers.iter_mut() {
1986                let receiver = receiver.as_mut().unwrap();
1987                let mut received = HashSet::new();
1988                while received.len() < n - 1 {
1989                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1990                    assert_eq!(sender.as_ref(), message.as_ref());
1991                    received.insert(sender);
1992                }
1993            }
1994
1995            assert_no_rate_limiting(&context);
1996        });
1997    }
1998    #[test_traced]
1999    fn test_peer_restart_with_new_address_must_dial() {
2000        let base_port = 3600;
2001        let n: usize = 5;
2002
2003        let executor = deterministic::Runner::default();
2004        executor.start(|context| async move {
2005            // Create peers
2006            let peers: Vec<_> = (0..n)
2007                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
2008                .collect();
2009            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
2010
2011            // Track all senders/receivers/handles across restarts
2012            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
2013            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
2014            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
2015
2016            // Track port allocations for each peer (updated on restart)
2017            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
2018
2019            // Peer 2 will advertise a WRONG dialable address (unreachable IP)
2020            // All other peers advertise correct addresses
2021            // This means when peer 1 restarts, it CANNOT dial peer 2 (wrong address)
2022            // Peer 2 must dial peer 1 to reconnect
2023            let wrong_ip = IpAddr::V4(Ipv4Addr::new(10, 255, 255, 1)); // Unreachable IP
2024            let wrong_address_peer_idx = 2;
2025
2026            for (i, peer) in peers.iter().enumerate() {
2027                let peer_context = context.with_label(&format!("peer_{i}"));
2028                let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]);
2029
2030                // Peer 2 advertises wrong IP, others advertise correct addresses
2031                let dialable_addr: Ingress = if i == wrong_address_peer_idx {
2032                    SocketAddr::new(wrong_ip, ports[i]).into()
2033                } else {
2034                    listen_addr.into()
2035                };
2036
2037                // Create bootstrappers (everyone connects to peer 0)
2038                let mut bootstrappers = Vec::new();
2039                if i > 0 {
2040                    bootstrappers.push((
2041                        addresses[0].clone(),
2042                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2043                    ));
2044                }
2045
2046                let mut config =
2047                    Config::test(peer.clone(), listen_addr, bootstrappers, 1_024 * 1_024);
2048                config.dialable = dialable_addr;
2049
2050                let (mut network, mut oracle) =
2051                    Network::new(peer_context.with_label("network"), config);
2052
2053                oracle
2054                    .update(0, addresses.clone().try_into().unwrap())
2055                    .await;
2056
2057                let (sender, receiver) =
2058                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2059                senders[i] = Some(sender);
2060                receivers[i] = Some(receiver);
2061
2062                let handle = network.start();
2063                handles[i] = Some(handle);
2064            }
2065
2066            // Wait for full connectivity
2067            for (i, sender) in senders.iter_mut().enumerate() {
2068                let sender = sender.as_mut().unwrap();
2069                loop {
2070                    let sent = sender
2071                        .send(Recipients::All, peers[i].public_key().as_ref(), true)
2072                        .await
2073                        .unwrap();
2074                    if sent.len() == n - 1 {
2075                        break;
2076                    }
2077                    context.sleep(Duration::from_millis(100)).await;
2078                }
2079            }
2080
2081            // Verify each peer can receive from all others
2082            for receiver in receivers.iter_mut() {
2083                let receiver = receiver.as_mut().unwrap();
2084                let mut received = HashSet::new();
2085                while received.len() < n - 1 {
2086                    let (sender, message) = receiver.recv().await.unwrap();
2087                    assert_eq!(sender.as_ref(), message.as_ref());
2088                    received.insert(sender);
2089                }
2090            }
2091
2092            // Restart peer 1 with a new port
2093            // After restart, peer 1 can dial peers 0, 3, 4 (correct addresses)
2094            // but CANNOT dial peer 2 (wrong address).
2095            // Peer 2 must dial peer 1 to reconnect (after learning peer 1's new address
2096            // through gossip from the bootstrapper).
2097            let restart_peer_idx = 1;
2098            let new_port = base_port + 100;
2099            ports[restart_peer_idx] = new_port;
2100
2101            // Abort the peer's network
2102            if let Some(handle) = handles[restart_peer_idx].take() {
2103                handle.abort();
2104            }
2105            senders[restart_peer_idx] = None;
2106            receivers[restart_peer_idx] = None;
2107
2108            // Restart the peer with a NEW port and CORRECT dialable address
2109            let peer_context = context.with_label(&format!("peer_{restart_peer_idx}_restarted"));
2110            let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port);
2111            let bootstrappers = vec![(
2112                addresses[0].clone(),
2113                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2114            )];
2115
2116            let config = Config::test(
2117                peers[restart_peer_idx].clone(),
2118                listen_addr,
2119                bootstrappers,
2120                1_024 * 1_024,
2121            );
2122
2123            let (mut network, mut oracle) =
2124                Network::new(peer_context.with_label("network"), config);
2125
2126            oracle
2127                .update(0, addresses.clone().try_into().unwrap())
2128                .await;
2129
2130            let (sender, receiver) =
2131                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2132            senders[restart_peer_idx] = Some(sender);
2133            receivers[restart_peer_idx] = Some(receiver);
2134
2135            let handle = network.start();
2136            handles[restart_peer_idx] = Some(handle);
2137
2138            // Wait for the restarted peer to reconnect to all other peers
2139            // For peer 2 (wrong address), this MUST happen via:
2140            // 1. Restarted peer dials peer 0 (bootstrapper)
2141            // 2. Peer 0 gossips restarted peer's new address to peer 2
2142            // 3. Peer 2 dials the restarted peer
2143            let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
2144            loop {
2145                let sent = restarted_sender
2146                    .send(
2147                        Recipients::All,
2148                        peers[restart_peer_idx].public_key().as_ref(),
2149                        true,
2150                    )
2151                    .await
2152                    .unwrap();
2153                if sent.len() == n - 1 {
2154                    break;
2155                }
2156                context.sleep(Duration::from_millis(100)).await;
2157            }
2158
2159            // Verify all other peers can send to the restarted peer
2160            for i in 0..n {
2161                if i == restart_peer_idx {
2162                    continue;
2163                }
2164                let sender = senders[i].as_mut().unwrap();
2165                loop {
2166                    let sent = sender
2167                        .send(
2168                            Recipients::One(addresses[restart_peer_idx].clone()),
2169                            peers[i].public_key().as_ref(),
2170                            true,
2171                        )
2172                        .await
2173                        .unwrap();
2174                    if sent.len() == 1 {
2175                        break;
2176                    }
2177                    context.sleep(Duration::from_millis(100)).await;
2178                }
2179            }
2180
2181            // Verify the restarted peer receives from all others
2182            let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
2183            let mut received = HashSet::new();
2184            while received.len() < n - 1 {
2185                let (sender, message) = restarted_receiver.recv().await.unwrap();
2186                assert_eq!(sender.as_ref(), message.as_ref());
2187                received.insert(sender);
2188            }
2189
2190            assert_no_rate_limiting(&context);
2191        });
2192    }
2193
2194    #[test_traced]
2195    fn test_operations_after_shutdown_do_not_panic() {
2196        let executor = deterministic::Runner::default();
2197        executor.start(|context| async move {
2198            let peer = ed25519::PrivateKey::from_seed(0);
2199            let address = peer.public_key();
2200
2201            let peer_context = context.with_label("peer");
2202            let config = Config::test(
2203                peer.clone(),
2204                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2205                vec![],
2206                MAX_MESSAGE_SIZE,
2207            );
2208            let (mut network, mut oracle) =
2209                Network::new(peer_context.with_label("network"), config);
2210
2211            // Register channel and peer set
2212            let (mut sender, _receiver) =
2213                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2214            let peers: Set<ed25519::PublicKey> = vec![address.clone()].try_into().unwrap();
2215            oracle.update(0, peers.clone()).await;
2216
2217            // Start and immediately abort the network
2218            let handle = network.start();
2219            handle.abort();
2220
2221            // Wait for shutdown to propagate
2222            context.sleep(Duration::from_millis(100)).await;
2223
2224            // Oracle operations should not panic even after shutdown
2225            oracle.update(1, peers.clone()).await;
2226            let _ = oracle.peer_set(0).await;
2227            let _ = oracle.subscribe().await;
2228            oracle.block(address.clone()).await;
2229
2230            // Sender operations should not panic even after shutdown
2231            let sent = sender
2232                .send(Recipients::All, address.as_ref(), true)
2233                .await
2234                .unwrap();
2235            assert!(sent.is_empty());
2236        });
2237    }
2238
2239    fn clean_shutdown(seed: u64) {
2240        let cfg = deterministic::Config::default()
2241            .with_seed(seed)
2242            .with_timeout(Some(Duration::from_secs(30)));
2243        let executor = deterministic::Runner::new(cfg);
2244        executor.start(|context| async move {
2245            let peer = ed25519::PrivateKey::from_seed(0);
2246
2247            let peer_context = context.with_label("peer");
2248            let config = Config::test(
2249                peer.clone(),
2250                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2251                vec![],
2252                MAX_MESSAGE_SIZE,
2253            );
2254            let (mut network, mut oracle) =
2255                Network::new(peer_context.with_label("network"), config);
2256
2257            // Register channel and peer set
2258            let (_, _) =
2259                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2260            let peers: Set<ed25519::PublicKey> = vec![peer.public_key()].try_into().unwrap();
2261            oracle.update(0, peers).await;
2262
2263            // Start the network
2264            let handle = network.start();
2265
2266            // Allow tasks to start
2267            context.sleep(Duration::from_millis(100)).await;
2268
2269            // Count running tasks under the network prefix
2270            let running_before = count_running_tasks(&context, "peer_network");
2271            assert!(
2272                running_before > 0,
2273                "at least one network task should be running"
2274            );
2275
2276            // Abort the network
2277            handle.abort();
2278            let _ = handle.await;
2279
2280            // Give the runtime a tick to process aborts
2281            context.sleep(Duration::from_millis(100)).await;
2282
2283            // Verify all network tasks are stopped
2284            let running_after = count_running_tasks(&context, "peer_network");
2285            assert_eq!(
2286                running_after, 0,
2287                "all network tasks should be stopped, but {running_after} still running"
2288            );
2289        });
2290    }
2291
2292    #[test_traced]
2293    fn test_clean_shutdown() {
2294        for seed in 0..25 {
2295            clean_shutdown(seed);
2296        }
2297    }
2298
2299    #[test]
2300    fn test_broadcast_slow_peer_no_blocking() {
2301        let executor = deterministic::Runner::timed(Duration::from_secs(5));
2302        executor.start(|context| async move {
2303            // Create router
2304            let cfg = RouterConfig { mailbox_size: 10 };
2305            let (router, mut mailbox, messenger) =
2306                RouterActor::<_, ed25519::PublicKey>::new(context.clone(), cfg);
2307
2308            // Create channels for the router
2309            let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2310            let _handle = router.start(channels);
2311
2312            // Register peer 1 with small buffer (will fill up after 10 messages)
2313            // Must keep receivers alive to prevent channel from closing
2314            let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2315            let (slow_low, _slow_low_rx) = mpsc::channel(10);
2316            let (slow_high, _slow_high_rx) = mpsc::channel(10);
2317            assert!(
2318                mailbox
2319                    .ready(slow_peer.clone(), Relay::new(slow_low, slow_high))
2320                    .await
2321                    .is_some(),
2322                "Failed to register slow peer"
2323            );
2324
2325            // Register peer 2 with large buffer
2326            let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2327            let (fast_low, mut fast_receiver) = mpsc::channel(100);
2328            let (fast_high, _fast_high_rx) = mpsc::channel(100);
2329            assert!(
2330                mailbox
2331                    .ready(fast_peer.clone(), Relay::new(fast_low, fast_high))
2332                    .await
2333                    .is_some(),
2334                "Failed to register fast peer"
2335            );
2336
2337            let message = Bytes::from(vec![0u8; 100]);
2338            let mut messenger = messenger;
2339
2340            // Send 10 messages to fill slow_peer's buffer
2341            for i in 0..10 {
2342                let sent = messenger
2343                    .content(Recipients::All, 0, message.clone(), false)
2344                    .await;
2345                assert_eq!(sent.len(), 2, "Broadcast {i} should reach both peers");
2346            }
2347
2348            // 11th broadcast: slow_peer's buffer is full, so its message is dropped
2349            let sent = messenger.content(Recipients::All, 0, message, false).await;
2350            assert!(
2351                sent.contains(&fast_peer),
2352                "Fast peer should receive message"
2353            );
2354
2355            // Verify fast_peer received all 11 messages
2356            for _ in 0..11 {
2357                assert!(fast_receiver.try_next().is_ok());
2358            }
2359        });
2360    }
2361}