Skip to main content

commonware_p2p/authenticated/discovery/
mod.rs

1//! Communicate with a fixed set of authenticated peers without known addresses over encrypted connections.
2//!
3//! `discovery` provides multiplexed communication between fully-connected peers
4//! identified by a developer-specified cryptographic identity (i.e. BLS, ed25519, etc.).
5//! Peer discovery occurs automatically using ordered bit vectors (sorted by authorized
6//! cryptographic identities) to efficiently communicate knowledge of dialable peers.
7//!
8//! # Features
9//!
10//! - Configurable Cryptography Scheme for Peer Identities (BLS, ed25519, etc.)
11//! - Automatic Peer Discovery Using Bit Vectors (Also Used as Ping Messages)
12//! - Multiplexing With Configurable Rate Limiting Per Channel and Send Prioritization
13//!
14//! # Design
15//!
16//! ## Discovery
17//!
18//! Peer discovery operates under the assumption that all peers are aware of and synchronized on
19//! the composition of peer sets at specific, user-provided indices (`u64`). Each index maps to a
20//! list of authorized `PublicKey`s (`(u64, Vec<PublicKey>)`). Based on this shared knowledge, each
21//! peer can construct a sorted bit vector message (`BitVec`) representing its knowledge of the
22//! dialable addresses [std::net::SocketAddr] for the peers in that set.
23//! The `BitVec` message contains:
24//! - `index`: The `u64` index the bit vector applies to.
25//! - `bits`: The bit vector itself, where a '1' signifies knowledge of the corresponding
26//!   peer's address in the sorted list for that index.
27//!
28//! Even if a peer has some knowledge of the address of a peer in the set, if they fail to dial that
29//! peer a (configurable) number of times, they will assume that information is stale and mark it as
30//! '0' (unknown) in the bit vector. This is done to prevent peers from being "stuck" with outdated
31//! information about other peers.
32//!
33//! _Warning: If peers are not synchronized on the peer set composition at a given index,
34//! discovery messages can be misinterpreted. A peer might associate a bit vector index with the
35//! wrong peer or fail to parse the vector if its length doesn't match the expected set size. The
36//! application layer is responsible for ensuring peer set synchronization._
37//!
38//! Due to their small size, these `BitVec` messages are exchanged periodically (configured by
39//! `gossip_bit_vec_frequency` in the [Config]) between connected peers. This serves as both a
40//! peer discovery mechanism and a keep-alive "ping" message to maintain the underlying
41//! connection, especially during periods of low application-level traffic. The protocol supports
42//! tracking multiple peer sets concurrently (up to `tracked_peer_sets`), each identified by its
43//! `index`. This is useful, for instance, during transitions like distributed key generation
44//! (DKG) where connections to both old and new peer sets are needed simultaneously.
45//!
46//! Upon receiving a `BitVec` message, a peer compares it against its own knowledge for the same
47//! index. If the receiving peer knows addresses that the sender marked as '0' (unknown), it
48//! selects a random subset of these known `Info` structures (up to `peer_gossip_max_count`)
49//! and sends them back in a `Payload::Peers` message. To save bandwidth, peers will only gossip
50//! `Info` for peers that they currently have a connection with. This prevents them from
51//! repeatedly sending `Info` that they cannot verify is still valid. Each `Info` contains:
52//! - `socket`: The [std::net::SocketAddr] of the peer.
53//! - `timestamp`: A `u64` timestamp indicating when the address was attested.
54//! - `public_key`: The peer's public key.
55//! - `signature`: The peer's cryptographic signature over the `socket` and `timestamp`.
56//!
57//! If the receiver doesn't know any addresses the sender is unaware of, it sends no
58//! `Payload::Peers` response; the received `BitVec` implicitly acts as a "pong".
59//!
60//! If a peer receives a `Info` message (either directly or through gossip) containing a more
61//! recent timestamp for a known peer's address, it updates its local `Record`. This updated
62//! `Info` is also used in future gossip messages. Each peer generates its own signed
63//! `Info` upon startup and sends it immediately after establishing a connection (following
64//! the cryptographic handshake). This ensures that if a peer connects using an outdated address
65//! record, it will be corrected promptly by the peer being dialed.
66//!
67//! To initiate the discovery process, a peer needs a list of `bootstrappers` (defined in
68//! [Config]) - known peer public keys and their corresponding socket addresses. The peer
69//! attempts to dial these bootstrappers, performs the handshake, sends its own `Info`, and
70//! then sends a `BitVec` for the relevant peer set(s) (initially only knowing its own address,
71//! marked as '1'). It then waits for responses, learning about other peers through the
72//! `Payload::Peers` messages received. Bootstrapper information is persisted, and connections to
73//! them are maintained even if they aren't part of any currently tracked peer sets. Different
74//! peers can have different bootstrapper lists.
75//!
76//! _Note: If a peer (listener) receives a connection request from another peer (dialer) that
77//! belongs to a registered peer set, the listener will accept the connection, even if the
78//! listener itself hasn't yet learned about that specific peer set (or has an older version). The
79//! core requirement is that the listener recognizes the *dialer's public key* as belonging to
80//! *some* authorized set it tracks (see `actors::tracker::Actor`). This mechanism allows peers
81//! with more up-to-date peer set information to connect and propagate that information, enabling
82//! the listener to potentially learn about newer sets it is part of._
83//!
84//! ## Messages
85//!
86//! Application-level data is exchanged using the `Payload::Data` message type, which wraps an
87//! internal `Data` structure. This structure contains:
88//! - `channel`: A `u32` identifier used to route the message to the correct application handler.
89//! - `message`: The arbitrary application payload as `IoBuf`.
90//!
91//! The size of the `message` bytes must not exceed the configured
92//! `max_message_size`. If it does, the sending operation will fail with
93//! [Error::MessageTooLarge]. Messages can be sent with `priority`, allowing certain
94//! communications to potentially bypass lower-priority messages waiting in send queues across all
95//! channels. Each registered channel ([Sender], [Receiver]) handles its own message queuing
96//! and rate limiting.
97//!
98//! ## Compression
99//!
100//! Stream compression is not provided at the transport layer to avoid inadvertently
101//! enabling known attacks such as BREACH and CRIME. These attacks exploit the interaction
102//! between compression and encryption by analyzing patterns in the resulting data.
103//! By compressing secrets alongside attacker-controlled content, these attacks can infer
104//! sensitive information through compression ratio analysis. Applications that choose
105//! to compress data should do so with full awareness of these risks and implement
106//! appropriate mitigations (such as ensuring no attacker-controlled data is compressed
107//! alongside sensitive information).
108//!
109//! ## Rate Limiting
110//!
111//! There are five primary rate limits:
112//!
113//! - `max_concurrent_handshakes`: The maximum number of concurrent handshake attempts allowed.
114//! - `allowed_handshake_rate_per_ip`: The rate limit for handshake attempts originating from a single IP address.
115//! - `allowed_handshake_rate_per_subnet`: The rate limit for handshake attempts originating from a single IP subnet.
116//! - `allowed_connection_rate_per_peer`: The rate limit for connections to a single peer (incoming or outgoing).
117//! - `rate` (per channel): The rate limit for messages sent on a single channel.
118//!
119//! _Users should consider these rate limits as best-effort protection against moderate abuse. Targeted abuse (e.g. DDoS)
120//! must be mitigated with an external proxy (that limits inbound connection attempts to authorized IPs)._
121//!
122//! ## Message Delivery
123//!
124//! Outgoing messages are dropped when a peer's send buffer is full, preventing slow peers
125//! from blocking sends to other peers. Incoming messages are dropped when the application's
126//! receive buffer is full, ensuring protocol messages (BitVec, Peers) continue to flow and
127//! connections remain healthy.
128//!
129//! # Example
130//!
131//! ```rust
132//! use commonware_p2p::{authenticated::discovery::{self, Network}, Ingress, Manager, Sender, Recipients};
133//! use commonware_cryptography::{ed25519, Signer, PrivateKey as _, PublicKey as _, };
134//! use commonware_runtime::{deterministic, IoBuf, Metrics, Quota, Runner, Spawner};
135//! use commonware_utils::NZU32;
136//! use std::net::{IpAddr, Ipv4Addr, SocketAddr};
137//!
138//! // Configure context
139//! let runtime_cfg = deterministic::Config::default();
140//! let runner = deterministic::Runner::new(runtime_cfg);
141//!
142//! // Generate identity
143//! //
144//! // In production, the signer should be generated from a secure source of entropy.
145//! let signer = ed25519::PrivateKey::from_seed(0);
146//!
147//! // Generate peers
148//! //
149//! // In production, peer identities will be provided by some external source of truth
150//! // (like the staking set of a blockchain).
151//! let peer1 = ed25519::PrivateKey::from_seed(1).public_key();
152//! let peer2 = ed25519::PrivateKey::from_seed(2).public_key();
153//! let peer3 = ed25519::PrivateKey::from_seed(3).public_key();
154//!
155//! // Configure bootstrappers
156//! //
157//! // In production, it is likely that the address of bootstrappers will be some public address.
158//! let bootstrapper_addr: Ingress = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3001).into();
159//! let bootstrappers = vec![(peer1.clone(), bootstrapper_addr)];
160//!
161//! // Configure namespace
162//! //
163//! // In production, use a unique application namespace to prevent cryptographic replay attacks.
164//! let application_namespace = b"my-app-namespace";
165//!
166//! // Configure network
167//! //
168//! // In production, use a more conservative configuration like `Config::recommended`.
169//! const MAX_MESSAGE_SIZE: u32 = 1_024; // 1KB
170//! let p2p_cfg = discovery::Config::local(
171//!     signer.clone(),
172//!     application_namespace,
173//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3000),
174//!     SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3000), // Use a specific dialable addr
175//!     bootstrappers,
176//!     MAX_MESSAGE_SIZE,
177//! );
178//!
179//! // Start context
180//! runner.start(|context| async move {
181//!     // Initialize network
182//!     let (mut network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
183//!
184//!     // Register authorized peers
185//!     //
186//!     // In production, this would be updated as new peer sets are created (like when
187//!     // the composition of a validator set changes).
188//!     oracle.track(0, [signer.public_key(), peer1, peer2, peer3].try_into().unwrap()).await;
189//!
190//!     // Register some channel
191//!     const MAX_MESSAGE_BACKLOG: usize = 128;
192//!     let (mut sender, receiver) = network.register(
193//!         0,
194//!         Quota::per_second(NZU32!(1)),
195//!         MAX_MESSAGE_BACKLOG,
196//!     );
197//!
198//!     // Run network
199//!     network.start();
200//!
201//!     // Example: Use sender
202//!     let _ = sender.send(Recipients::All, IoBuf::from(b"hello"), false).await;
203//!
204//!     // Graceful shutdown (stops all spawned tasks)
205//!     context.stop(0, None).await.unwrap();
206//! });
207//! ```
208
209mod actors;
210mod channels;
211mod config;
212mod metrics;
213mod network;
214mod types;
215
216use thiserror::Error;
217
218/// Errors that can occur when interacting with the network.
219#[derive(Error, Debug)]
220pub enum Error {
221    #[error("message too large: {0}")]
222    MessageTooLarge(usize),
223    #[error("network closed")]
224    NetworkClosed,
225}
226
227pub use actors::tracker::Oracle;
228pub use channels::{Receiver, Sender};
229pub use config::{Bootstrapper, Config};
230pub use network::Network;
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::{
236        authenticated::{
237            discovery::actors::router::{Actor as RouterActor, Config as RouterConfig},
238            relay::Relay,
239        },
240        Blocker, Ingress, Manager, Provider, Receiver, Recipients, Sender,
241    };
242    use commonware_cryptography::{ed25519, Signer as _};
243    use commonware_macros::{select, select_loop, test_group, test_traced};
244    use commonware_runtime::{
245        count_running_tasks, deterministic, tokio, BufferPooler, Clock, Handle, IoBuf, Metrics,
246        Network as RNetwork, Quota, Resolver, Runner, Spawner,
247    };
248    use commonware_utils::{channel::mpsc, hostname, ordered::Set, TryCollect, NZU32};
249    use rand_core::{CryptoRngCore, RngCore};
250    use std::{
251        collections::HashSet,
252        net::{IpAddr, Ipv4Addr, SocketAddr},
253        time::Duration,
254    };
255
256    #[derive(Copy, Clone)]
257    enum Mode {
258        All,
259        Some,
260        One,
261    }
262
263    const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; // 1MB
264    const DEFAULT_MESSAGE_BACKLOG: usize = 128;
265
266    /// Ensure no message rate limiting occurred.
267    ///
268    /// If a message is rate limited, it would be formatted as:
269    ///
270    /// ```text
271    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="data_0"} 1
272    /// peer-9_network_spawner_messages_rate_limited_total{peer="e2e8aa145e1ec5cb01ebfaa40e10e12f0230c832fd8135470c001cb86d77de00",message="ping"} 1
273    /// ```
274    fn assert_no_rate_limiting(context: &impl Metrics) {
275        let metrics = context.encode();
276        assert!(
277            !metrics.contains("messages_rate_limited_total{"),
278            "no messages should be rate limited: {metrics}"
279        );
280    }
281
282    /// Test connectivity between `n` peers.
283    ///
284    /// We set a unique `base_port` for each test to avoid "address already in use"
285    /// errors when tests are run immediately after each other.
286    async fn run_network(
287        context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
288        max_message_size: u32,
289        base_port: u16,
290        n: usize,
291        mode: Mode,
292    ) {
293        // Create peers
294        let mut peers = Vec::new();
295        for i in 0..n {
296            peers.push(ed25519::PrivateKey::from_seed(i as u64));
297        }
298        let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
299
300        // Create networks
301        let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
302        for (i, peer) in peers.iter().enumerate() {
303            // Create peer context
304            let context = context.with_label(&format!("peer_{i}"));
305
306            // Derive port
307            let port = base_port + i as u16;
308
309            // Create bootstrappers
310            let mut bootstrappers = Vec::new();
311            if i > 0 {
312                bootstrappers.push((
313                    addresses[0].clone(),
314                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
315                ));
316            }
317
318            // Create network
319            let signer = peer.clone();
320            let config = Config::test(
321                signer.clone(),
322                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
323                bootstrappers,
324                max_message_size,
325            );
326            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
327
328            // Register peers
329            oracle.track(0, addresses.clone().try_into().unwrap()).await;
330
331            // Register basic application
332            let (mut sender, mut receiver) =
333                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
334
335            // Wait to connect to all peers, and then send messages to everyone
336            network.start();
337
338            // Send/Receive messages
339            context.with_label("agent").spawn({
340                let complete_sender = complete_sender.clone();
341                let addresses = addresses.clone();
342                move |context| async move {
343                    // Wait for all peers to send their identity
344                    let receiver = context.with_label("receiver").spawn(move |_| async move {
345                        // Wait for all peers to send their identity
346                        let mut received = HashSet::new();
347                        while received.len() < n - 1 {
348                            // Ensure message equals sender identity
349                            let (sender, message) = receiver.recv().await.unwrap();
350                            assert_eq!(message, sender.as_ref());
351
352                            // Add to received set
353                            received.insert(sender);
354                        }
355                        complete_sender.send(()).await.unwrap();
356
357                        // Process messages until all finished (or else sender loops could get stuck as a peer may drop)
358                        loop {
359                            receiver.recv().await.unwrap();
360                        }
361                    });
362
363                    // Send identity to all peers
364                    let msg = signer.public_key();
365                    let sender = context
366                        .with_label("sender")
367                        .spawn(move |context| async move {
368                            // Get all peers not including self
369                            let mut recipients = addresses.clone();
370                            recipients.remove(i);
371                            recipients.sort();
372
373                            // Loop forever to account for unexpected message drops
374                            loop {
375                                match mode {
376                                    Mode::One => {
377                                        for recipient in &recipients {
378                                            // Loop until success
379                                            loop {
380                                                let sent = sender
381                                                    .send(
382                                                        Recipients::One(recipient.clone()),
383                                                        msg.as_ref().to_vec(),
384                                                        true,
385                                                    )
386                                                    .await
387                                                    .unwrap();
388                                                if sent.len() != 1 {
389                                                    context.sleep(Duration::from_millis(100)).await;
390                                                    continue;
391                                                }
392                                                assert_eq!(&sent[0], recipient);
393                                                break;
394                                            }
395                                        }
396                                    }
397                                    Mode::Some | Mode::All => {
398                                        // Loop until all peer sends successful
399                                        loop {
400                                            let mut sent = sender
401                                                .send(
402                                                    match mode {
403                                                        Mode::Some => {
404                                                            Recipients::Some(recipients.clone())
405                                                        }
406                                                        Mode::All => Recipients::All,
407                                                        _ => unreachable!(),
408                                                    },
409                                                    msg.as_ref().to_vec(),
410                                                    true,
411                                                )
412                                                .await
413                                                .unwrap();
414                                            if sent.len() != recipients.len() {
415                                                context.sleep(Duration::from_millis(100)).await;
416                                                continue;
417                                            }
418
419                                            // Compare to expected
420                                            sent.sort();
421                                            assert_eq!(sent, recipients);
422                                            break;
423                                        }
424                                    }
425                                };
426
427                                // Sleep to avoid busy loop
428                                context.sleep(Duration::from_secs(10)).await;
429                            }
430                        });
431
432                    // Neither task should exit
433                    select! {
434                        receiver = receiver => {
435                            panic!("receiver exited: {receiver:?}");
436                        },
437                        sender = sender => {
438                            panic!("sender exited: {sender:?}");
439                        },
440                    }
441                }
442            });
443        }
444
445        // Wait for all peers to finish
446        for _ in 0..n {
447            complete_receiver.recv().await.unwrap();
448        }
449
450        // Ensure no message rate limiting occurred
451        assert_no_rate_limiting(&context);
452    }
453
454    fn run_deterministic_test(seed: u64, mode: Mode) {
455        // Configure test
456        const NUM_PEERS: usize = 25;
457        const BASE_PORT: u16 = 3000;
458
459        // Run first instance
460        let executor = deterministic::Runner::seeded(seed);
461        let state = executor.start(|context| async move {
462            run_network(
463                context.clone(),
464                MAX_MESSAGE_SIZE,
465                BASE_PORT,
466                NUM_PEERS,
467                mode,
468            )
469            .await;
470            context.auditor().state()
471        });
472
473        // Compare result to second instance
474        let executor = deterministic::Runner::seeded(seed);
475        let state2 = executor.start(|context| async move {
476            run_network(
477                context.clone(),
478                MAX_MESSAGE_SIZE,
479                BASE_PORT,
480                NUM_PEERS,
481                mode,
482            )
483            .await;
484            context.auditor().state()
485        });
486        assert_eq!(state, state2);
487    }
488
489    #[test_group("slow")]
490    #[test_traced]
491    fn test_determinism_one() {
492        for i in 0..10 {
493            run_deterministic_test(i, Mode::One);
494        }
495    }
496
497    #[test_group("slow")]
498    #[test_traced]
499    fn test_determinism_some() {
500        for i in 0..10 {
501            run_deterministic_test(i, Mode::Some);
502        }
503    }
504
505    #[test_group("slow")]
506    #[test_traced]
507    fn test_determinism_all() {
508        for i in 0..10 {
509            run_deterministic_test(i, Mode::All);
510        }
511    }
512
513    #[test_traced]
514    fn test_tokio_connectivity() {
515        let executor = tokio::Runner::default();
516        executor.start(|context| async move {
517            let base_port = 3000;
518            let n = 10;
519            run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
520        });
521    }
522
523    #[test_traced]
524    fn test_multi_index_oracle() {
525        // Configure test
526        let base_port = 3000;
527        let n: usize = 100;
528
529        // Initialize context
530        let executor = deterministic::Runner::default();
531        executor.start(|context| async move {
532            // Create peers
533            let mut peers = Vec::new();
534            for i in 0..n {
535                peers.push(ed25519::PrivateKey::from_seed(i as u64));
536            }
537            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
538
539            // Create networks
540            let mut waiters = Vec::new();
541            for (i, peer) in peers.iter().enumerate() {
542                // Create peer context
543                let context = context.with_label(&format!("peer_{i}"));
544
545                // Derive port
546                let port = base_port + i as u16;
547
548                // Create bootstrappers
549                let mut bootstrappers = Vec::new();
550                if i > 0 {
551                    bootstrappers.push((
552                        addresses[0].clone(),
553                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
554                    ));
555                }
556
557                // Create network
558                let signer = peer.clone();
559                let config = Config::test(
560                    signer.clone(),
561                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
562                    bootstrappers,
563                    1_024 * 1_024, // 1MB
564                );
565                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
566
567                // Register peers at separate indices
568                oracle
569                    .track(0, [addresses[0].clone()].try_into().unwrap())
570                    .await;
571                oracle
572                    .track(
573                        1,
574                        [addresses[1].clone(), addresses[2].clone()]
575                            .try_into()
576                            .unwrap(),
577                    )
578                    .await;
579                oracle
580                    .track(2, addresses.iter().skip(2).cloned().try_collect().unwrap())
581                    .await;
582
583                // Register basic application
584                let (mut sender, mut receiver) =
585                    network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
586
587                // Wait to connect to all peers, and then send messages to everyone
588                network.start();
589
590                // Send/Receive messages
591                let handler = context
592                    .with_label("agent")
593                    .spawn(move |context| async move {
594                        if i == 0 {
595                            // Loop until success
596                            let msg = signer.public_key();
597                            loop {
598                                if sender
599                                    .send(Recipients::All, msg.as_ref().to_vec(), true)
600                                    .await
601                                    .unwrap()
602                                    .len()
603                                    == n - 1
604                                {
605                                    break;
606                                }
607
608                                // Sleep and try again (avoid busy loop)
609                                context.sleep(Duration::from_millis(100)).await;
610                            }
611                        } else {
612                            // Ensure message equals sender identity
613                            let (sender, message) = receiver.recv().await.unwrap();
614                            assert_eq!(message, sender.as_ref());
615                        }
616                    });
617
618                // Add to waiters
619                waiters.push(handler);
620            }
621
622            // Wait for waiters to finish (receiver before sender)
623            for waiter in waiters.into_iter().rev() {
624                waiter.await.unwrap();
625            }
626
627            // Ensure no message rate limiting occurred
628            assert_no_rate_limiting(&context);
629        });
630    }
631
632    #[test_traced]
633    fn test_message_too_large() {
634        // Configure test
635        let base_port = 3000;
636        let n: usize = 2;
637
638        // Initialize context
639        let executor = deterministic::Runner::seeded(0);
640        executor.start(|mut context| async move {
641            // Create peers
642            let mut peers = Vec::new();
643            for i in 0..n {
644                peers.push(ed25519::PrivateKey::from_seed(i as u64));
645            }
646            let addresses: Set<_> = peers.iter().map(|p| p.public_key()).try_collect().unwrap();
647
648            // Create network
649            let signer = peers[0].clone();
650            let config = Config::test(
651                signer.clone(),
652                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
653                Vec::new(),
654                1_024 * 1_024, // 1MB
655            );
656            let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
657
658            // Register peers
659            oracle.track(0, addresses.clone()).await;
660
661            // Register basic application
662            let (mut sender, _) =
663                network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
664
665            // Wait to connect to all peers, and then send messages to everyone
666            network.start();
667
668            // Crate random message
669            let mut msg = vec![0u8; 10 * 1024 * 1024]; // 10MB (greater than frame capacity)
670            context.fill_bytes(&mut msg[..]);
671
672            // Send message
673            let recipient = Recipients::One(addresses[1].clone());
674            let result = sender.send(recipient, msg, true).await;
675            assert!(matches!(result, Err(Error::MessageTooLarge(_))));
676        });
677    }
678
679    #[test_traced]
680    fn test_rate_limiting() {
681        // Configure test
682        let base_port = 3000;
683        let n: usize = 2;
684
685        // Initialize context
686        let executor = deterministic::Runner::seeded(0);
687        executor.start(|context| async move {
688            // Create peers
689            let mut peers = Vec::new();
690            for i in 0..n {
691                peers.push(ed25519::PrivateKey::from_seed(i as u64));
692            }
693            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
694            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
695            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
696
697            // Create network for peer 0
698            let signer0 = peers[0].clone();
699            let config0 = Config::test(
700                signer0.clone(),
701                socket0,
702                vec![(peers[1].public_key(), socket1.into())],
703                1_024 * 1_024, // 1MB
704            );
705            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
706            oracle0
707                .track(0, addresses.clone().try_into().unwrap())
708                .await;
709            let (mut sender0, _receiver0) =
710                network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
711            network0.start();
712
713            // Create network for peer 1
714            let signer1 = peers[1].clone();
715            let config1 = Config::test(
716                signer1.clone(),
717                socket1,
718                vec![(peers[0].public_key(), socket0.into())],
719                1_024 * 1_024, // 1MB
720            );
721            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
722            oracle1
723                .track(0, addresses.clone().try_into().unwrap())
724                .await;
725            let (_sender1, _receiver1) =
726                network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
727            network1.start();
728
729            // Send first message, which should be allowed and consume the quota.
730            let msg = vec![0u8; 1024]; // 1KB
731            loop {
732                // Confirm message is sent to peer
733                let sent = sender0
734                    .send(Recipients::One(addresses[1].clone()), msg.clone(), true)
735                    .await
736                    .unwrap();
737                if !sent.is_empty() {
738                    break;
739                }
740
741                // Ensure we don't rate limit outbound sends while
742                // waiting for peers to connect
743                context.sleep(Duration::from_mins(1)).await
744            }
745
746            // Immediately send the second message to trigger the rate limit.
747            // With partial sends, rate-limited recipients return empty vec (not error).
748            // Outbound rate limiting skips the peer, returns empty vec.
749            let sent = sender0
750                .send(Recipients::One(addresses[1].clone()), msg, true)
751                .await
752                .unwrap();
753            assert!(sent.is_empty());
754
755            // Give the metrics time to reflect the rate-limited message.
756            for _ in 0..10 {
757                assert_no_rate_limiting(&context);
758                context.sleep(Duration::from_millis(100)).await;
759            }
760        });
761    }
762
763    #[test_traced]
764    fn test_unordered_peer_sets() {
765        let (n, base_port) = (10, 3000);
766        let executor = deterministic::Runner::default();
767        executor.start(|context| async move {
768            // Create peers
769            let mut peers_and_sks = Vec::new();
770            for i in 0..n {
771                let sk = ed25519::PrivateKey::from_seed(i as u64);
772                let pk = sk.public_key();
773                let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
774                peers_and_sks.push((sk, pk, addr));
775            }
776            let peer0 = peers_and_sks[0].clone();
777            let config = Config::test(
778                peer0.0,
779                peer0.2,
780                vec![(peer0.1.clone(), peer0.2.into())],
781                1_024 * 1_024,
782            );
783            let (network, mut oracle) = Network::new(context.with_label("network"), config);
784            network.start();
785
786            // Subscribe to peer sets
787            let mut subscription = oracle.subscribe().await;
788
789            // Register initial peer set
790            let set10: Set<_> = peers_and_sks
791                .iter()
792                .take(2)
793                .map(|(_, pk, _)| pk.clone())
794                .try_collect()
795                .unwrap();
796            oracle.track(10, set10.clone()).await;
797            let (id, new, all) = subscription.recv().await.unwrap();
798            assert_eq!(id, 10);
799            assert_eq!(new, set10);
800            assert_eq!(all, set10);
801
802            // Register old peer sets (ignored)
803            let set9: Set<_> = peers_and_sks
804                .iter()
805                .skip(2)
806                .map(|(_, pk, _)| pk.clone())
807                .try_collect()
808                .unwrap();
809            oracle.track(9, set9.clone()).await;
810
811            // Add new peer set
812            let set11: Set<_> = peers_and_sks
813                .iter()
814                .skip(4)
815                .map(|(_, pk, _)| pk.clone())
816                .try_collect()
817                .unwrap();
818            oracle.track(11, set11.clone()).await;
819            let (id, new, all) = subscription.recv().await.unwrap();
820            assert_eq!(id, 11);
821            assert_eq!(new, set11);
822            let all_keys: Set<_> = set10
823                .into_iter()
824                .chain(set11.into_iter())
825                .try_collect()
826                .unwrap();
827            assert_eq!(all, all_keys);
828        });
829    }
830
831    #[test_traced]
832    fn test_graceful_shutdown() {
833        let base_port = 3000;
834        let n: usize = 5;
835
836        let executor = deterministic::Runner::default();
837        executor.start(|context| async move {
838            // Create peers
839            let mut peers = Vec::new();
840            for i in 0..n {
841                peers.push(ed25519::PrivateKey::from_seed(i as u64));
842            }
843            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
844
845            // Create networks for all peers
846            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
847            for (i, peer) in peers.iter().enumerate() {
848                let peer_context = context.with_label(&format!("peer_{i}"));
849                let port = base_port + i as u16;
850
851                // Create bootstrappers (everyone connects to peer 0)
852                let mut bootstrappers = Vec::new();
853                if i > 0 {
854                    bootstrappers.push((
855                        addresses[0].clone(),
856                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
857                    ));
858                }
859
860                let signer = peer.clone();
861                let config = Config::test(
862                    signer.clone(),
863                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
864                    bootstrappers,
865                    1_024 * 1_024, // 1MB
866                );
867                let (mut network, mut oracle) =
868                    Network::new(peer_context.with_label("network"), config);
869
870                // Register peer set
871                oracle.track(0, addresses.clone().try_into().unwrap()).await;
872
873                let (mut sender, mut receiver) =
874                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
875                network.start();
876
877                peer_context.with_label("agent").spawn({
878                    let complete_sender = complete_sender.clone();
879                    move |context| async move {
880                        // Wait to connect to at least one other peer (except for peer 0 which is the bootstrapper)
881                        let expected_connections = if i == 0 { n - 1 } else { 1 };
882
883                        // Send a message
884                        let msg = signer.public_key();
885                        loop {
886                            let sent = sender
887                                .send(Recipients::All, msg.as_ref().to_vec(), true)
888                                .await
889                                .unwrap();
890                            if sent.len() >= expected_connections {
891                                break;
892                            }
893                            context.sleep(Duration::from_millis(100)).await;
894                        }
895
896                        // Signal that this peer is connected
897                        complete_sender.send(()).await.unwrap();
898
899                        // Keep receiving messages until shutdown
900                        select_loop! {
901                            context,
902                            on_stopped => {},
903                            Ok(_) = receiver.recv() else break => {},
904                        }
905                    }
906                });
907            }
908
909            // Wait for all peers to establish connectivity
910            for _ in 0..n {
911                complete_receiver.recv().await.unwrap();
912            }
913
914            // Verify that network actors started for all peers
915            let metrics_before = context.encode();
916            let is_running = |name: &str| -> bool {
917                metrics_before.lines().any(|line| {
918                    line.starts_with("runtime_tasks_running{")
919                        && line.contains(&format!("name=\"{name}\""))
920                        && line.contains("kind=\"Task\"")
921                        && line.trim_end().ends_with(" 1")
922                })
923            };
924            for i in 0..n {
925                let prefix = format!("peer_{i}_network");
926                assert!(
927                    is_running(&format!("{prefix}_tracker")),
928                    "peer_{i} tracker should be running"
929                );
930                assert!(
931                    is_running(&format!("{prefix}_router")),
932                    "peer_{i} router should be running"
933                );
934                assert!(
935                    is_running(&format!("{prefix}_spawner")),
936                    "peer_{i} spawner should be running"
937                );
938                assert!(
939                    is_running(&format!("{prefix}_listener")),
940                    "peer_{i} listener should be running"
941                );
942                assert!(
943                    is_running(&format!("{prefix}_dialer")),
944                    "peer_{i} dialer should be running"
945                );
946            }
947
948            // All peers are connected - now trigger graceful shutdown
949            // by stopping the context
950            let shutdown_context = context.clone();
951            context.with_label("shutdown").spawn(move |_| async move {
952                // Trigger graceful shutdown
953                let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
954
955                // Shutdown should complete successfully without timeout
956                assert!(
957                    result.is_ok(),
958                    "graceful shutdown should complete: {result:?}"
959                );
960            });
961
962            // Wait for shutdown to complete
963            context.stopped().await.unwrap();
964
965            // Give the runtime a tick to process task completions and update metrics
966            context.sleep(Duration::from_millis(100)).await;
967
968            // Verify that all network actors stopped
969            let metrics_after = context.encode();
970            let is_stopped = |name: &str| -> bool {
971                metrics_after.lines().any(|line| {
972                    line.starts_with("runtime_tasks_running{")
973                        && line.contains(&format!("name=\"{name}\""))
974                        && line.contains("kind=\"Task\"")
975                        && line.trim_end().ends_with(" 0")
976                })
977            };
978            for i in 0..n {
979                let prefix = format!("peer_{i}_network");
980                assert!(
981                    is_stopped(&format!("{prefix}_tracker")),
982                    "peer_{i} tracker should be stopped"
983                );
984                assert!(
985                    is_stopped(&format!("{prefix}_router")),
986                    "peer_{i} router should be stopped"
987                );
988                assert!(
989                    is_stopped(&format!("{prefix}_spawner")),
990                    "peer_{i} spawner should be stopped"
991                );
992                assert!(
993                    is_stopped(&format!("{prefix}_listener")),
994                    "peer_{i} listener should be stopped"
995                );
996                assert!(
997                    is_stopped(&format!("{prefix}_dialer")),
998                    "peer_{i} dialer should be stopped"
999                );
1000            }
1001        });
1002    }
1003
1004    #[test_traced]
1005    fn test_subscription_includes_self_when_registered() {
1006        let base_port = 3000;
1007        let executor = deterministic::Runner::default();
1008        executor.start(|context| async move {
1009            // Create self (peer0) and other peers
1010            let self_sk = ed25519::PrivateKey::from_seed(0);
1011            let self_pk = self_sk.public_key();
1012            let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1013
1014            let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
1015
1016            // Create network for peer0 (self)
1017            let config = Config::test(
1018                self_sk,
1019                self_addr,
1020                vec![], // No bootstrappers
1021                1_024 * 1_024,
1022            );
1023            let (network, mut oracle) = Network::new(context.with_label("network"), config);
1024            network.start();
1025
1026            // Subscribe to peer sets
1027            let mut subscription = oracle.subscribe().await;
1028
1029            // Register a peer set that does NOT include self
1030            let peer_set: Set<_> = [other_pk.clone()].try_into().unwrap();
1031            oracle.track(1, peer_set.clone()).await;
1032
1033            // Receive subscription notification
1034            let (id, new, all) = subscription.recv().await.unwrap();
1035            assert_eq!(id, 1);
1036            assert_eq!(new.len(), 1);
1037            assert_eq!(all.len(), 1);
1038
1039            // Self should NOT be in the new set
1040            assert!(
1041                new.position(&self_pk).is_none(),
1042                "new set should not include self"
1043            );
1044            assert!(
1045                new.position(&other_pk).is_some(),
1046                "new set should include other"
1047            );
1048
1049            // Self should NOT be in the tracked set (not registered)
1050            assert!(
1051                all.position(&self_pk).is_none(),
1052                "tracked peers should not include self"
1053            );
1054            assert!(
1055                all.position(&other_pk).is_some(),
1056                "tracked peers should include other"
1057            );
1058
1059            // Now register a peer set that DOES include self
1060            let peer_set: Set<_> = [self_pk.clone(), other_pk.clone()].try_into().unwrap();
1061            oracle.track(2, peer_set.clone()).await;
1062
1063            // Receive subscription notification
1064            let (id, new, all) = subscription.recv().await.unwrap();
1065            assert_eq!(id, 2);
1066            assert_eq!(new.len(), 2);
1067            assert_eq!(all.len(), 2);
1068
1069            // Both peers should be in the new set
1070            assert!(
1071                new.position(&self_pk).is_some(),
1072                "new set should include self"
1073            );
1074            assert!(
1075                new.position(&other_pk).is_some(),
1076                "new set should include other"
1077            );
1078
1079            // Both peers should be in the tracked set
1080            assert!(
1081                all.position(&self_pk).is_some(),
1082                "tracked peers should include self"
1083            );
1084            assert!(
1085                all.position(&other_pk).is_some(),
1086                "tracked peers should include other"
1087            );
1088        });
1089    }
1090
1091    #[test_traced]
1092    fn test_dns_bootstrapper_resolution() {
1093        let base_port = 3000;
1094        let n: usize = 3;
1095
1096        let executor = deterministic::Runner::default();
1097        executor.start(|context| async move {
1098            // Create peers
1099            let mut peers = Vec::new();
1100            for i in 0..n {
1101                peers.push(ed25519::PrivateKey::from_seed(i as u64));
1102            }
1103            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1104
1105            // Register DNS mapping for the bootstrapper (peer 0)
1106            let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1107            context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1108
1109            // Create networks
1110            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1111            for (i, peer) in peers.iter().enumerate() {
1112                let context = context.with_label(&format!("peer_{i}"));
1113                let port = base_port + i as u16;
1114
1115                // Create bootstrappers - use DNS for non-zero peers
1116                let bootstrappers = if i > 0 {
1117                    vec![(
1118                        addresses[0].clone(),
1119                        Ingress::Dns {
1120                            host: hostname!("boot.local"),
1121                            port: base_port,
1122                        },
1123                    )]
1124                } else {
1125                    vec![]
1126                };
1127
1128                // Create network
1129                let signer = peer.clone();
1130                let config = Config::test(
1131                    signer.clone(),
1132                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1133                    bootstrappers,
1134                    1_024 * 1_024,
1135                );
1136                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1137
1138                // Register peers
1139                oracle.track(0, addresses.clone().try_into().unwrap()).await;
1140
1141                // Register channel
1142                let (mut sender, mut receiver) =
1143                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1144
1145                network.start();
1146
1147                // Send/Receive messages
1148                context.with_label("agent").spawn({
1149                    let complete_sender = complete_sender.clone();
1150                    let addresses = addresses.clone();
1151                    move |context| async move {
1152                        // Wait for messages from other peers
1153                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1154                            let mut received = HashSet::new();
1155                            while received.len() < n - 1 {
1156                                let (sender, message) = receiver.recv().await.unwrap();
1157                                assert_eq!(message, sender.as_ref());
1158                                received.insert(sender);
1159                            }
1160                            complete_sender.send(()).await.unwrap();
1161
1162                            loop {
1163                                receiver.recv().await.unwrap();
1164                            }
1165                        });
1166
1167                        // Send identity to all peers
1168                        let msg = signer.public_key();
1169                        let sender =
1170                            context
1171                                .with_label("sender")
1172                                .spawn(move |context| async move {
1173                                    loop {
1174                                        let mut recipients = addresses.clone();
1175                                        recipients.remove(i);
1176                                        recipients.sort();
1177
1178                                        loop {
1179                                            let mut sent = sender
1180                                                .send(Recipients::All, msg.as_ref().to_vec(), true)
1181                                                .await
1182                                                .unwrap();
1183                                            if sent.len() != recipients.len() {
1184                                                context.sleep(Duration::from_millis(100)).await;
1185                                                continue;
1186                                            }
1187                                            sent.sort();
1188                                            assert_eq!(sent, recipients);
1189                                            break;
1190                                        }
1191
1192                                        context.sleep(Duration::from_secs(10)).await;
1193                                    }
1194                                });
1195
1196                        select! {
1197                            receiver = receiver => {
1198                                panic!("receiver exited: {receiver:?}")
1199                            },
1200                            sender = sender => {
1201                                panic!("sender exited: {sender:?}")
1202                            },
1203                        }
1204                    }
1205                });
1206            }
1207
1208            // Wait for all peers to exchange messages
1209            for _ in 0..n {
1210                complete_receiver.recv().await.unwrap();
1211            }
1212
1213            assert_no_rate_limiting(&context);
1214        });
1215    }
1216
1217    #[test_traced]
1218    fn test_dns_resolution_failure_then_success() {
1219        let base_port = 3100;
1220
1221        let executor = deterministic::Runner::default();
1222        executor.start(|context| async move {
1223            // Create 2 peers
1224            let peer0 = ed25519::PrivateKey::from_seed(0);
1225            let peer1 = ed25519::PrivateKey::from_seed(1);
1226            let addresses = vec![peer0.public_key(), peer1.public_key()];
1227
1228            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1229            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1230
1231            // Do NOT register DNS mapping initially - peer 1 will fail to resolve
1232
1233            // Create network for peer 0 (bootstrapper, no DNS)
1234            let config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1235            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1236            oracle0
1237                .track(0, addresses.clone().try_into().unwrap())
1238                .await;
1239            let (mut sender0, mut receiver0) =
1240                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1241            network0.start();
1242
1243            // Create network for peer 1 with DNS bootstrapper
1244            let config1 = Config::test(
1245                peer1.clone(),
1246                socket1,
1247                vec![(
1248                    peer0.public_key(),
1249                    Ingress::Dns {
1250                        host: hostname!("boot.local"),
1251                        port: base_port,
1252                    },
1253                )],
1254                1_024 * 1_024,
1255            );
1256            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1257            oracle1
1258                .track(0, addresses.clone().try_into().unwrap())
1259                .await;
1260            let (mut sender1, mut receiver1) =
1261                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1262            network1.start();
1263
1264            // Wait a bit - peer 1 should fail to connect (DNS not registered)
1265            context.sleep(Duration::from_secs(2)).await;
1266
1267            // Verify peer 0 cannot send to peer 1 yet
1268            let sent = sender0
1269                .send(Recipients::One(peer1.public_key()), b"test", true)
1270                .await
1271                .unwrap();
1272            assert!(sent.is_empty(), "should not be connected yet");
1273
1274            // Now register the DNS mapping
1275            context.resolver_register("boot.local", Some(vec![IpAddr::V4(Ipv4Addr::LOCALHOST)]));
1276
1277            // Wait for peer 1 to connect via DNS resolution
1278            let pk0 = peer0.public_key();
1279            let pk1 = peer1.public_key();
1280            let msg0 = pk0.to_vec();
1281            let msg1 = pk1.to_vec();
1282
1283            // Spawn receiver tasks
1284            let (done_sender, mut done_receiver) = mpsc::channel::<()>(2);
1285            let done0 = done_sender.clone();
1286            let pk1_clone = pk1.clone();
1287            context.with_label("recv0").spawn(move |_| async move {
1288                let (sender, message) = receiver0.recv().await.unwrap();
1289                assert_eq!(sender, pk1_clone);
1290                assert_eq!(message, msg1.as_slice());
1291                done0.send(()).await.unwrap();
1292            });
1293            let done1 = done_sender.clone();
1294            let pk0_clone = pk0.clone();
1295            context.with_label("recv1").spawn(move |_| async move {
1296                let (sender, message) = receiver1.recv().await.unwrap();
1297                assert_eq!(sender, pk0_clone);
1298                assert_eq!(message, msg0.as_slice());
1299                done1.send(()).await.unwrap();
1300            });
1301
1302            // Send messages until both peers connected
1303            context.with_label("sender").spawn({
1304                let pk0 = pk0.clone();
1305                let pk1 = pk1.clone();
1306                move |context| async move {
1307                    loop {
1308                        let sent0 = sender0
1309                            .send(Recipients::One(pk1.clone()), pk0.as_ref().to_vec(), true)
1310                            .await
1311                            .unwrap();
1312                        let sent1 = sender1
1313                            .send(Recipients::One(pk0.clone()), pk1.as_ref().to_vec(), true)
1314                            .await
1315                            .unwrap();
1316                        if !sent0.is_empty() && !sent1.is_empty() {
1317                            break;
1318                        }
1319                        context.sleep(Duration::from_millis(100)).await;
1320                    }
1321                }
1322            });
1323
1324            // Wait for both receivers to get messages
1325            done_receiver.recv().await.unwrap();
1326            done_receiver.recv().await.unwrap();
1327        });
1328    }
1329
1330    /// Helper to run DNS connectivity test with a specific seed and return auditor state.
1331    fn run_dns_connectivity(seed: u64) -> String {
1332        let base_port = 3400;
1333        let n: usize = 3;
1334
1335        let executor = deterministic::Runner::seeded(seed);
1336        executor.start(|context| async move {
1337            // Create peers
1338            let mut peers = Vec::new();
1339            for i in 0..n {
1340                peers.push(ed25519::PrivateKey::from_seed(i as u64));
1341            }
1342            let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1343
1344            // Register DNS mappings
1345            let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1346            context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1347
1348            // Create networks
1349            let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1350            for (i, peer) in peers.iter().enumerate() {
1351                let context = context.with_label(&format!("peer_{i}"));
1352                let port = base_port + i as u16;
1353
1354                // Use DNS for bootstrapper
1355                let bootstrappers = if i > 0 {
1356                    vec![(
1357                        addresses[0].clone(),
1358                        Ingress::Dns {
1359                            host: hostname!("boot.local"),
1360                            port: base_port,
1361                        },
1362                    )]
1363                } else {
1364                    vec![]
1365                };
1366
1367                let signer = peer.clone();
1368                let config = Config::test(
1369                    signer.clone(),
1370                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1371                    bootstrappers,
1372                    1_024 * 1_024,
1373                );
1374                let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1375                oracle.track(0, addresses.clone().try_into().unwrap()).await;
1376                let (mut sender, mut receiver) =
1377                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1378                network.start();
1379
1380                context.with_label("agent").spawn({
1381                    let complete_sender = complete_sender.clone();
1382                    let addresses = addresses.clone();
1383                    move |context| async move {
1384                        let receiver = context.with_label("receiver").spawn(move |_| async move {
1385                            let mut received = HashSet::new();
1386                            while received.len() < n - 1 {
1387                                let (sender, message) = receiver.recv().await.unwrap();
1388                                assert_eq!(message, sender.as_ref());
1389                                received.insert(sender);
1390                            }
1391                            complete_sender.send(()).await.unwrap();
1392                            loop {
1393                                receiver.recv().await.unwrap();
1394                            }
1395                        });
1396
1397                        let msg = signer.public_key();
1398                        let sender =
1399                            context
1400                                .with_label("sender")
1401                                .spawn(move |context| async move {
1402                                    loop {
1403                                        let mut recipients = addresses.clone();
1404                                        recipients.remove(i);
1405                                        recipients.sort();
1406
1407                                        loop {
1408                                            let mut sent = sender
1409                                                .send(Recipients::All, msg.as_ref().to_vec(), true)
1410                                                .await
1411                                                .unwrap();
1412                                            if sent.len() != recipients.len() {
1413                                                context.sleep(Duration::from_millis(100)).await;
1414                                                continue;
1415                                            }
1416                                            sent.sort();
1417                                            assert_eq!(sent, recipients);
1418                                            break;
1419                                        }
1420
1421                                        context.sleep(Duration::from_secs(10)).await;
1422                                    }
1423                                });
1424
1425                        select! {
1426                            receiver = receiver => {
1427                                panic!("receiver exited: {receiver:?}")
1428                            },
1429                            sender = sender => {
1430                                panic!("sender exited: {sender:?}")
1431                            },
1432                        }
1433                    }
1434                });
1435            }
1436
1437            for _ in 0..n {
1438                complete_receiver.recv().await.unwrap();
1439            }
1440
1441            context.auditor().state()
1442        })
1443    }
1444
1445    #[test_traced]
1446    fn test_dns_resolution_determinism() {
1447        // Run same test twice with same seed
1448        let state1 = run_dns_connectivity(42);
1449        let state2 = run_dns_connectivity(42);
1450        assert_eq!(state1, state2, "DNS resolution should be deterministic");
1451    }
1452
1453    #[test_traced]
1454    fn test_dns_resolving_to_private_ip_not_dialed() {
1455        // Test that when allow_private_ips=false, DNS addresses that resolve
1456        // to private IPs are not dialed.
1457        let base_port = 3300;
1458        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1459        executor.start(|context| async move {
1460            let peer0 = ed25519::PrivateKey::from_seed(0);
1461            let peer1 = ed25519::PrivateKey::from_seed(1);
1462
1463            let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1464            let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1465
1466            // Register DNS mapping that resolves to localhost (private IP)
1467            context.resolver_register("boot.local".to_string(), Some(vec![socket0.ip()]));
1468
1469            let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1470
1471            // Create peer 0 (bootstrapper) with allow_private_ips=true
1472            let mut config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1473            config0.allow_private_ips = true;
1474            let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1475            oracle0
1476                .track(0, addresses.clone().try_into().unwrap())
1477                .await;
1478            let (_sender0, mut receiver0) =
1479                network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1480            network0.start();
1481
1482            // Create peer 1 with allow_private_ips=false using DNS bootstrapper
1483            let bootstrappers = vec![(
1484                peer0.public_key(),
1485                Ingress::Dns {
1486                    host: hostname!("boot.local"),
1487                    port: base_port,
1488                },
1489            )];
1490            let mut config1 = Config::test(peer1.clone(), socket1, bootstrappers, 1_024 * 1_024);
1491            config1.allow_private_ips = false; // This should prevent dialing the private IP
1492            let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1493            oracle1
1494                .track(0, addresses.clone().try_into().unwrap())
1495                .await;
1496            let (mut sender1, _receiver1) =
1497                network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1498            network1.start();
1499
1500            // Wait for a period during which peer 1 would normally connect
1501            context.sleep(Duration::from_secs(5)).await;
1502
1503            // Try to send from peer 1 - should not reach anyone since private IPs are blocked
1504            let sent = sender1
1505                .send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
1506                .await
1507                .unwrap();
1508            assert!(
1509                sent.is_empty(),
1510                "peer 1 should not have connected to peer 0 (private IP)"
1511            );
1512
1513            // Verify peer 0 received nothing from peer 1
1514            select! {
1515                msg = receiver0.recv() => {
1516                    panic!("peer 0 should not have received any message, got: {msg:?}");
1517                },
1518                _ = context.sleep(Duration::from_secs(1)) => {
1519                    // Expected: timeout with no message
1520                },
1521            }
1522        });
1523    }
1524
1525    #[test_traced]
1526    fn test_dns_mixed_ips_connectivity() {
1527        // Test that peers can connect even when DNS resolves to multiple IPs
1528        // where most are unreachable. The dialer randomly picks an IP, so
1529        // eventually it should pick the reachable one.
1530        //
1531        // Run over 25 seeds to ensure we exercise the random IP selection.
1532        for seed in 0..25 {
1533            let base_port = 3400;
1534
1535            let cfg = deterministic::Config::default()
1536                .with_seed(seed)
1537                .with_timeout(Some(Duration::from_secs(120)));
1538            let executor = deterministic::Runner::new(cfg);
1539            executor.start(|context| async move {
1540                let peer0 = ed25519::PrivateKey::from_seed(0);
1541                let peer1 = ed25519::PrivateKey::from_seed(1);
1542
1543                let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1544                let socket0 = SocketAddr::new(good_ip, base_port);
1545                let socket1 = SocketAddr::new(good_ip, base_port + 1);
1546
1547                // Register DNS mappings with 3 bad IPs and 1 good IP for both peers
1548                let mut all_ips0: Vec<IpAddr> = (1..=3)
1549                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1550                    .collect();
1551                all_ips0.push(good_ip);
1552                context.resolver_register("peer-0.local", Some(all_ips0));
1553
1554                let mut all_ips1: Vec<IpAddr> = (1..=3)
1555                    .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1556                    .collect();
1557                all_ips1.push(good_ip);
1558                context.resolver_register("peer-1.local", Some(all_ips1));
1559
1560                let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1561
1562                // Create peer 0 with peer 1 as DNS bootstrapper
1563                let bootstrappers0 = vec![(
1564                    peer1.public_key(),
1565                    Ingress::Dns {
1566                        host: hostname!("peer-1.local"),
1567                        port: base_port + 1,
1568                    },
1569                )];
1570                let config0 = Config::test(peer0.clone(), socket0, bootstrappers0, 1_024 * 1_024);
1571                let (mut network0, mut oracle0) =
1572                    Network::new(context.with_label("peer_0"), config0);
1573                oracle0
1574                    .track(0, addresses.clone().try_into().unwrap())
1575                    .await;
1576                let (_sender0, mut receiver0) =
1577                    network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1578                network0.start();
1579
1580                // Create peer 1 with peer 0 as DNS bootstrapper
1581                let bootstrappers1 = vec![(
1582                    peer0.public_key(),
1583                    Ingress::Dns {
1584                        host: hostname!("peer-0.local"),
1585                        port: base_port,
1586                    },
1587                )];
1588                let config1 = Config::test(peer1.clone(), socket1, bootstrappers1, 1_024 * 1_024);
1589                let (mut network1, mut oracle1) =
1590                    Network::new(context.with_label("peer_1"), config1);
1591                oracle1
1592                    .track(0, addresses.clone().try_into().unwrap())
1593                    .await;
1594                let (mut sender1, _receiver1) =
1595                    network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1596                network1.start();
1597
1598                // Wait for peers to connect (may take multiple attempts due to random IP selection)
1599                let pk0 = peer0.public_key();
1600                loop {
1601                    let sent = sender1
1602                        .send(
1603                            Recipients::One(pk0.clone()),
1604                            peer1.public_key().as_ref().to_vec(),
1605                            true,
1606                        )
1607                        .await
1608                        .unwrap();
1609                    if !sent.is_empty() {
1610                        break;
1611                    }
1612                    context.sleep(Duration::from_millis(100)).await;
1613                }
1614
1615                // Verify peer 0 received the message
1616                let (sender, msg) = receiver0.recv().await.unwrap();
1617                assert_eq!(sender, peer1.public_key());
1618                assert_eq!(msg, peer1.public_key().as_ref());
1619            });
1620        }
1621    }
1622
1623    #[test_traced]
1624    fn test_many_peer_restart_with_new_address() {
1625        let base_port = 7500;
1626        let n = 5;
1627
1628        let executor = deterministic::Runner::default();
1629        executor.start(|context| async move {
1630            // Create peers
1631            let peers: Vec<_> = (0..n)
1632                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1633                .collect();
1634            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1635
1636            // Track senders/receivers/handles across restarts
1637            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1638            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1639            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1640
1641            // Track port allocations (updated on restart)
1642            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1643
1644            // Create networks for all peers (peer 0 is bootstrapper)
1645            for (i, peer) in peers.iter().enumerate() {
1646                let peer_context = context.with_label(&format!("peer_{i}"));
1647
1648                // Non-bootstrapper peers point to peer 0
1649                let mut bootstrappers = Vec::new();
1650                if i > 0 {
1651                    bootstrappers.push((
1652                        addresses[0].clone(),
1653                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1654                    ));
1655                }
1656
1657                let config = Config::test(
1658                    peer.clone(),
1659                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1660                    bootstrappers,
1661                    MAX_MESSAGE_SIZE,
1662                );
1663                let (mut network, mut oracle) =
1664                    Network::new(peer_context.with_label("network"), config);
1665
1666                // Register peer set
1667                oracle.track(0, addresses.clone().try_into().unwrap()).await;
1668
1669                let (sender, receiver) =
1670                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1671                senders[i] = Some(sender);
1672                receivers[i] = Some(receiver);
1673
1674                let handle = network.start();
1675                handles[i] = Some(handle);
1676            }
1677
1678            // Wait for full connectivity (each peer can send to all others)
1679            for (i, sender) in senders.iter_mut().enumerate() {
1680                let sender = sender.as_mut().unwrap();
1681                loop {
1682                    let sent = sender
1683                        .send(
1684                            Recipients::All,
1685                            peers[i].public_key().as_ref().to_vec(),
1686                            true,
1687                        )
1688                        .await
1689                        .unwrap();
1690                    if sent.len() == n - 1 {
1691                        break;
1692                    }
1693                    context.sleep(Duration::from_millis(100)).await;
1694                }
1695            }
1696
1697            // Verify each peer can receive from all others
1698            for receiver in receivers.iter_mut() {
1699                let receiver = receiver.as_mut().unwrap();
1700                let mut received = HashSet::new();
1701                while received.len() < n - 1 {
1702                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1703                    assert_eq!(message, sender.as_ref());
1704                    received.insert(sender);
1705                }
1706            }
1707
1708            // Restart each non-first peer with a new port, multiple rounds
1709            let mut restart_counter = 0u16;
1710            for round in 0..3 {
1711                for restart_peer_idx in 1..n {
1712                    // Allocate a new unique port
1713                    restart_counter += 1;
1714                    let new_port = base_port + 100 + restart_counter;
1715                    ports[restart_peer_idx] = new_port;
1716
1717                    // Abort the peer's network
1718                    if let Some(handle) = handles[restart_peer_idx].take() {
1719                        handle.abort();
1720                    }
1721                    senders[restart_peer_idx] = None;
1722                    receivers[restart_peer_idx] = None;
1723
1724                    // Restart the peer with new port (uses bootstrapper for discovery)
1725                    let peer_context =
1726                        context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1727                    let bootstrappers = vec![(
1728                        addresses[0].clone(),
1729                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1730                    )];
1731                    let config = Config::test(
1732                        peers[restart_peer_idx].clone(),
1733                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1734                        bootstrappers,
1735                        MAX_MESSAGE_SIZE,
1736                    );
1737                    let (mut network, mut oracle) =
1738                        Network::new(peer_context.with_label("network"), config);
1739
1740                    // Register peer set
1741                    oracle.track(0, addresses.clone().try_into().unwrap()).await;
1742
1743                    let (sender, receiver) = network.register(
1744                        0,
1745                        Quota::per_second(NZU32!(100)),
1746                        DEFAULT_MESSAGE_BACKLOG,
1747                    );
1748                    senders[restart_peer_idx] = Some(sender);
1749                    receivers[restart_peer_idx] = Some(receiver);
1750
1751                    let handle = network.start();
1752                    handles[restart_peer_idx] = Some(handle);
1753
1754                    // Wait for the restarted peer to reconnect to all others
1755                    let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1756                    loop {
1757                        let sent = restarted_sender
1758                            .send(
1759                                Recipients::All,
1760                                peers[restart_peer_idx].public_key().as_ref().to_vec(),
1761                                true,
1762                            )
1763                            .await
1764                            .unwrap();
1765                        if sent.len() == n - 1 {
1766                            break;
1767                        }
1768                        context.sleep(Duration::from_millis(100)).await;
1769                    }
1770
1771                    // Verify other peers can send to the restarted peer
1772                    for i in 0..n {
1773                        if i == restart_peer_idx {
1774                            continue;
1775                        }
1776                        let sender = senders[i].as_mut().unwrap();
1777                        loop {
1778                            let sent = sender
1779                                .send(
1780                                    Recipients::One(addresses[restart_peer_idx].clone()),
1781                                    peers[i].public_key().as_ref().to_vec(),
1782                                    true,
1783                                )
1784                                .await
1785                                .unwrap();
1786                            if sent.len() == 1 {
1787                                break;
1788                            }
1789                            context.sleep(Duration::from_millis(100)).await;
1790                        }
1791                    }
1792
1793                    // Verify the restarted peer receives messages from all others
1794                    let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1795                    let mut received = HashSet::new();
1796                    while received.len() < n - 1 {
1797                        let (sender, message): (ed25519::PublicKey, _) =
1798                            restarted_receiver.recv().await.unwrap();
1799                        assert_eq!(message, sender.as_ref());
1800                        received.insert(sender);
1801                    }
1802                }
1803            }
1804
1805            assert_no_rate_limiting(&context);
1806        });
1807    }
1808
1809    #[test_traced]
1810    fn test_simultaneous_peer_restart() {
1811        let base_port = 7700;
1812        let n = 5;
1813
1814        let executor = deterministic::Runner::default();
1815        executor.start(|context| async move {
1816            // Create peers
1817            let peers: Vec<_> = (0..n)
1818                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1819                .collect();
1820            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1821
1822            // Track port allocations (updated on restart)
1823            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1824
1825            // Track senders/receivers/handles across restarts
1826            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1827            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1828            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1829
1830            // Create networks for all peers (peer 0 is bootstrapper)
1831            for (i, peer) in peers.iter().enumerate() {
1832                let peer_context = context.with_label(&format!("peer_{i}"));
1833
1834                // Non-bootstrapper peers point to peer 0
1835                let mut bootstrappers = Vec::new();
1836                if i > 0 {
1837                    bootstrappers.push((
1838                        addresses[0].clone(),
1839                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1840                    ));
1841                }
1842
1843                let config = Config::test(
1844                    peer.clone(),
1845                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1846                    bootstrappers,
1847                    MAX_MESSAGE_SIZE,
1848                );
1849                let (mut network, mut oracle) =
1850                    Network::new(peer_context.with_label("network"), config);
1851
1852                // Register peer set
1853                oracle.track(0, addresses.clone().try_into().unwrap()).await;
1854
1855                let (sender, receiver) =
1856                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1857                senders[i] = Some(sender);
1858                receivers[i] = Some(receiver);
1859
1860                let handle = network.start();
1861                handles[i] = Some(handle);
1862            }
1863
1864            // Wait for full connectivity (each peer can send to all others)
1865            for (i, sender) in senders.iter_mut().enumerate() {
1866                let sender = sender.as_mut().unwrap();
1867                loop {
1868                    let sent = sender
1869                        .send(
1870                            Recipients::All,
1871                            peers[i].public_key().as_ref().to_vec(),
1872                            true,
1873                        )
1874                        .await
1875                        .unwrap();
1876                    if sent.len() == n - 1 {
1877                        break;
1878                    }
1879                    context.sleep(Duration::from_millis(100)).await;
1880                }
1881            }
1882
1883            // Verify each peer can receive from all others
1884            for receiver in receivers.iter_mut() {
1885                let receiver = receiver.as_mut().unwrap();
1886                let mut received = HashSet::new();
1887                while received.len() < n - 1 {
1888                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1889                    assert_eq!(message, sender.as_ref());
1890                    received.insert(sender);
1891                }
1892            }
1893
1894            // Shutdown all non-bootstrapper peers simultaneously.
1895            //
1896            // We keep the bootstrapper (peer 0) alive to exercise the case
1897            // where multiple peers churn at once.
1898            let restart_peers: Vec<usize> = (1..n).collect();
1899            for &idx in &restart_peers {
1900                if let Some(handle) = handles[idx].take() {
1901                    handle.abort();
1902                }
1903                senders[idx] = None;
1904                receivers[idx] = None;
1905                // Allocate new ports for restarted peers
1906                ports[idx] = base_port + 100 + idx as u16;
1907            }
1908
1909            // Wait for connections to be detected as closed
1910            context.sleep(Duration::from_secs(2)).await;
1911
1912            // Restart all peers with new ports (uses bootstrapper for discovery)
1913            for &idx in &restart_peers {
1914                let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1915                let bootstrappers = vec![(
1916                    addresses[0].clone(),
1917                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1918                )];
1919                let config = Config::test(
1920                    peers[idx].clone(),
1921                    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1922                    bootstrappers,
1923                    MAX_MESSAGE_SIZE,
1924                );
1925                let (mut network, mut oracle) =
1926                    Network::new(peer_context.with_label("network"), config);
1927
1928                // Register peer set
1929                oracle.track(0, addresses.clone().try_into().unwrap()).await;
1930
1931                let (sender, receiver) =
1932                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1933                senders[idx] = Some(sender);
1934                receivers[idx] = Some(receiver);
1935
1936                let handle = network.start();
1937                handles[idx] = Some(handle);
1938            }
1939
1940            // Wait for full connectivity after restart
1941            for (i, sender) in senders.iter_mut().enumerate() {
1942                let sender = sender.as_mut().unwrap();
1943                loop {
1944                    let sent = sender
1945                        .send(
1946                            Recipients::All,
1947                            peers[i].public_key().as_ref().to_vec(),
1948                            true,
1949                        )
1950                        .await
1951                        .unwrap();
1952                    if sent.len() == n - 1 {
1953                        break;
1954                    }
1955                    context.sleep(Duration::from_millis(100)).await;
1956                }
1957            }
1958
1959            // Verify each peer can receive from all others after restart
1960            for receiver in receivers.iter_mut() {
1961                let receiver = receiver.as_mut().unwrap();
1962                let mut received = HashSet::new();
1963                while received.len() < n - 1 {
1964                    let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1965                    assert_eq!(message, sender.as_ref());
1966                    received.insert(sender);
1967                }
1968            }
1969
1970            assert_no_rate_limiting(&context);
1971        });
1972    }
1973    #[test_traced]
1974    fn test_peer_restart_with_new_address_must_dial() {
1975        let base_port = 3600;
1976        let n: usize = 5;
1977
1978        let executor = deterministic::Runner::default();
1979        executor.start(|context| async move {
1980            // Create peers
1981            let peers: Vec<_> = (0..n)
1982                .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1983                .collect();
1984            let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1985
1986            // Track all senders/receivers/handles across restarts
1987            let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1988            let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1989            let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1990
1991            // Track port allocations for each peer (updated on restart)
1992            let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1993
1994            // Peer 2 will advertise a WRONG dialable address (unreachable IP)
1995            // All other peers advertise correct addresses
1996            // This means when peer 1 restarts, it CANNOT dial peer 2 (wrong address)
1997            // Peer 2 must dial peer 1 to reconnect
1998            let wrong_ip = IpAddr::V4(Ipv4Addr::new(10, 255, 255, 1)); // Unreachable IP
1999            let wrong_address_peer_idx = 2;
2000
2001            for (i, peer) in peers.iter().enumerate() {
2002                let peer_context = context.with_label(&format!("peer_{i}"));
2003                let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]);
2004
2005                // Peer 2 advertises wrong IP, others advertise correct addresses
2006                let dialable_addr: Ingress = if i == wrong_address_peer_idx {
2007                    SocketAddr::new(wrong_ip, ports[i]).into()
2008                } else {
2009                    listen_addr.into()
2010                };
2011
2012                // Create bootstrappers (everyone connects to peer 0)
2013                let mut bootstrappers = Vec::new();
2014                if i > 0 {
2015                    bootstrappers.push((
2016                        addresses[0].clone(),
2017                        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2018                    ));
2019                }
2020
2021                let mut config =
2022                    Config::test(peer.clone(), listen_addr, bootstrappers, 1_024 * 1_024);
2023                config.dialable = dialable_addr;
2024
2025                let (mut network, mut oracle) =
2026                    Network::new(peer_context.with_label("network"), config);
2027
2028                oracle.track(0, addresses.clone().try_into().unwrap()).await;
2029
2030                let (sender, receiver) =
2031                    network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2032                senders[i] = Some(sender);
2033                receivers[i] = Some(receiver);
2034
2035                let handle = network.start();
2036                handles[i] = Some(handle);
2037            }
2038
2039            // Wait for full connectivity
2040            for (i, sender) in senders.iter_mut().enumerate() {
2041                let sender = sender.as_mut().unwrap();
2042                loop {
2043                    let sent = sender
2044                        .send(
2045                            Recipients::All,
2046                            peers[i].public_key().as_ref().to_vec(),
2047                            true,
2048                        )
2049                        .await
2050                        .unwrap();
2051                    if sent.len() == n - 1 {
2052                        break;
2053                    }
2054                    context.sleep(Duration::from_millis(100)).await;
2055                }
2056            }
2057
2058            // Verify each peer can receive from all others
2059            for receiver in receivers.iter_mut() {
2060                let receiver = receiver.as_mut().unwrap();
2061                let mut received = HashSet::new();
2062                while received.len() < n - 1 {
2063                    let (sender, message) = receiver.recv().await.unwrap();
2064                    assert_eq!(message, sender.as_ref());
2065                    received.insert(sender);
2066                }
2067            }
2068
2069            // Restart peer 1 with a new port
2070            // After restart, peer 1 can dial peers 0, 3, 4 (correct addresses)
2071            // but CANNOT dial peer 2 (wrong address).
2072            // Peer 2 must dial peer 1 to reconnect (after learning peer 1's new address
2073            // through gossip from the bootstrapper).
2074            let restart_peer_idx = 1;
2075            let new_port = base_port + 100;
2076            ports[restart_peer_idx] = new_port;
2077
2078            // Abort the peer's network
2079            if let Some(handle) = handles[restart_peer_idx].take() {
2080                handle.abort();
2081            }
2082            senders[restart_peer_idx] = None;
2083            receivers[restart_peer_idx] = None;
2084
2085            // Restart the peer with a NEW port and CORRECT dialable address
2086            let peer_context = context.with_label(&format!("peer_{restart_peer_idx}_restarted"));
2087            let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port);
2088            let bootstrappers = vec![(
2089                addresses[0].clone(),
2090                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2091            )];
2092
2093            let config = Config::test(
2094                peers[restart_peer_idx].clone(),
2095                listen_addr,
2096                bootstrappers,
2097                1_024 * 1_024,
2098            );
2099
2100            let (mut network, mut oracle) =
2101                Network::new(peer_context.with_label("network"), config);
2102
2103            oracle.track(0, addresses.clone().try_into().unwrap()).await;
2104
2105            let (sender, receiver) =
2106                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2107            senders[restart_peer_idx] = Some(sender);
2108            receivers[restart_peer_idx] = Some(receiver);
2109
2110            let handle = network.start();
2111            handles[restart_peer_idx] = Some(handle);
2112
2113            // Wait for the restarted peer to reconnect to all other peers
2114            // For peer 2 (wrong address), this MUST happen via:
2115            // 1. Restarted peer dials peer 0 (bootstrapper)
2116            // 2. Peer 0 gossips restarted peer's new address to peer 2
2117            // 3. Peer 2 dials the restarted peer
2118            let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
2119            loop {
2120                let sent = restarted_sender
2121                    .send(
2122                        Recipients::All,
2123                        peers[restart_peer_idx].public_key().as_ref().to_vec(),
2124                        true,
2125                    )
2126                    .await
2127                    .unwrap();
2128                if sent.len() == n - 1 {
2129                    break;
2130                }
2131                context.sleep(Duration::from_millis(100)).await;
2132            }
2133
2134            // Verify all other peers can send to the restarted peer
2135            for i in 0..n {
2136                if i == restart_peer_idx {
2137                    continue;
2138                }
2139                let sender = senders[i].as_mut().unwrap();
2140                loop {
2141                    let sent = sender
2142                        .send(
2143                            Recipients::One(addresses[restart_peer_idx].clone()),
2144                            peers[i].public_key().as_ref().to_vec(),
2145                            true,
2146                        )
2147                        .await
2148                        .unwrap();
2149                    if sent.len() == 1 {
2150                        break;
2151                    }
2152                    context.sleep(Duration::from_millis(100)).await;
2153                }
2154            }
2155
2156            // Verify the restarted peer receives from all others
2157            let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
2158            let mut received = HashSet::new();
2159            while received.len() < n - 1 {
2160                let (sender, message) = restarted_receiver.recv().await.unwrap();
2161                assert_eq!(message, sender.as_ref());
2162                received.insert(sender);
2163            }
2164
2165            assert_no_rate_limiting(&context);
2166        });
2167    }
2168
2169    #[test_traced]
2170    fn test_operations_after_shutdown_do_not_panic() {
2171        let executor = deterministic::Runner::default();
2172        executor.start(|context| async move {
2173            let peer = ed25519::PrivateKey::from_seed(0);
2174            let address = peer.public_key();
2175
2176            let peer_context = context.with_label("peer");
2177            let config = Config::test(
2178                peer.clone(),
2179                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2180                vec![],
2181                MAX_MESSAGE_SIZE,
2182            );
2183            let (mut network, mut oracle) =
2184                Network::new(peer_context.with_label("network"), config);
2185
2186            // Register channel and peer set
2187            let (mut sender, _receiver) =
2188                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2189            let peers: Set<ed25519::PublicKey> = vec![address.clone()].try_into().unwrap();
2190            oracle.track(0, peers.clone()).await;
2191
2192            // Start and immediately abort the network
2193            let handle = network.start();
2194            handle.abort();
2195
2196            // Wait for shutdown to propagate
2197            context.sleep(Duration::from_millis(100)).await;
2198
2199            // Oracle operations should not panic even after shutdown
2200            oracle.track(1, peers.clone()).await;
2201            let _ = oracle.peer_set(0).await;
2202            let _ = oracle.subscribe().await;
2203            oracle.block(address.clone()).await;
2204
2205            // Sender operations should not panic even after shutdown
2206            let sent = sender
2207                .send(Recipients::All, address.as_ref().to_vec(), true)
2208                .await
2209                .unwrap();
2210            assert!(sent.is_empty());
2211        });
2212    }
2213
2214    fn clean_shutdown(seed: u64) {
2215        let cfg = deterministic::Config::default()
2216            .with_seed(seed)
2217            .with_timeout(Some(Duration::from_secs(30)));
2218        let executor = deterministic::Runner::new(cfg);
2219        executor.start(|context| async move {
2220            let peer = ed25519::PrivateKey::from_seed(0);
2221
2222            let peer_context = context.with_label("peer");
2223            let config = Config::test(
2224                peer.clone(),
2225                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2226                vec![],
2227                MAX_MESSAGE_SIZE,
2228            );
2229            let (mut network, mut oracle) =
2230                Network::new(peer_context.with_label("network"), config);
2231
2232            // Register channel and peer set
2233            let (_, _) =
2234                network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2235            let peers: Set<ed25519::PublicKey> = vec![peer.public_key()].try_into().unwrap();
2236            oracle.track(0, peers).await;
2237
2238            // Start the network
2239            let handle = network.start();
2240
2241            // Allow tasks to start
2242            context.sleep(Duration::from_millis(100)).await;
2243
2244            // Count running tasks under the network prefix
2245            let running_before = count_running_tasks(&context, "peer_network");
2246            assert!(
2247                running_before > 0,
2248                "at least one network task should be running"
2249            );
2250
2251            // Abort the network
2252            handle.abort();
2253            let _ = handle.await;
2254
2255            // Give the runtime a tick to process aborts
2256            context.sleep(Duration::from_millis(100)).await;
2257
2258            // Verify all network tasks are stopped
2259            let running_after = count_running_tasks(&context, "peer_network");
2260            assert_eq!(
2261                running_after, 0,
2262                "all network tasks should be stopped, but {running_after} still running"
2263            );
2264        });
2265    }
2266
2267    #[test_traced]
2268    fn test_clean_shutdown() {
2269        for seed in 0..25 {
2270            clean_shutdown(seed);
2271        }
2272    }
2273
2274    #[test]
2275    fn test_broadcast_slow_peer_no_blocking() {
2276        let executor = deterministic::Runner::timed(Duration::from_secs(5));
2277        executor.start(|context| async move {
2278            // Create router
2279            let cfg = RouterConfig { mailbox_size: 10 };
2280            let (router, mut mailbox, messenger) =
2281                RouterActor::<_, ed25519::PublicKey>::new(context.clone(), cfg);
2282
2283            // Create channels for the router
2284            let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2285            let _handle = router.start(channels);
2286
2287            // Register peer 1 with small buffer (will fill up after 10 messages)
2288            // Must keep receivers alive to prevent channel from closing
2289            let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2290            let (slow_low, _slow_low_rx) = mpsc::channel(10);
2291            let (slow_high, _slow_high_rx) = mpsc::channel(10);
2292            assert!(
2293                mailbox
2294                    .ready(slow_peer.clone(), Relay::new(slow_low, slow_high))
2295                    .await
2296                    .is_some(),
2297                "Failed to register slow peer"
2298            );
2299
2300            // Register peer 2 with large buffer
2301            let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2302            let (fast_low, mut fast_receiver) = mpsc::channel(100);
2303            let (fast_high, _fast_high_rx) = mpsc::channel(100);
2304            assert!(
2305                mailbox
2306                    .ready(fast_peer.clone(), Relay::new(fast_low, fast_high))
2307                    .await
2308                    .is_some(),
2309                "Failed to register fast peer"
2310            );
2311
2312            let message = IoBuf::from(vec![0u8; 100]);
2313            let mut messenger = messenger;
2314
2315            // Send 10 messages to fill slow_peer's buffer
2316            for i in 0..10 {
2317                let sent = messenger
2318                    .content(Recipients::All, 0, message.clone().into(), false)
2319                    .await;
2320                assert_eq!(sent.len(), 2, "Broadcast {i} should reach both peers");
2321            }
2322
2323            // 11th broadcast: slow_peer's buffer is full, so its message is dropped
2324            let sent = messenger
2325                .content(Recipients::All, 0, message.into(), false)
2326                .await;
2327            assert!(
2328                sent.contains(&fast_peer),
2329                "Fast peer should receive message"
2330            );
2331
2332            // Verify fast_peer received all 11 messages
2333            for _ in 0..11 {
2334                assert!(fast_receiver.try_recv().is_ok());
2335            }
2336        });
2337    }
2338}