Skip to main content

commonware_p2p/simulated/
mod.rs

1//! Simulate networking between peers with configurable link behavior (i.e. drops, latency, corruption, etc.).
2//!
3//! Both peer and link modification can be performed dynamically over the lifetime of the simulated network. This
4//! can be used to mimic transient network partitions, offline nodes (that later connect), and/or degrading link
5//! quality. Messages on a link are delivered in order, and optional per-peer bandwidth limits account for
6//! transmission delay and queueing.
7//!
8//! # Determinism
9//!
10//! `commonware-p2p::simulated` can be run deterministically when paired with `commonware-runtime::deterministic`.
11//! This makes it possible to reproduce an arbitrary order of delivered/dropped messages with a given seed.
12//!
13//! # Bandwidth Simulation
14//!
15//! The simulator provides a realistic model of bandwidth contention where network
16//! capacity is a shared, finite resource. Bandwidth is allocated via progressive
17//! filling to provide max-min fairness.
18//!
19//! _If no bandwidth constraints are provided (default behavior), progressive filling and bandwidth
20//! tracking are not performed (avoiding unnecessary overhead for minimal p2p testing common in CI)._
21//!
22//! ## Core Model
23//!
24//! Whenever a transfer starts or finishes, or a bandwidth limit is updated, we execute a scheduling tick:
25//!
26//! 1. **Collect Active Flows:** Gather every active transfer that still has
27//!    bytes to send. A flow is bound to one sender and to one receiver (if the message will be delivered).
28//! 2. **Compute Progressive Filling:** Run progressive filling to raise the rate of
29//!    every active flow in lock-step until some sender's egress or receiver's ingress
30//!    limit saturates (at which point the flow is frozen and the process repeats with what remains).
31//! 3. **Wait for the Next Event:** Using those rates, determine which flow will
32//!    finish first by computing how long it needs to transmit its remaining
33//!    bytes. Advance simulated time directly to that completion instant (advancing all other flows
34//!    by the bytes transferred over the interval).
35//! 4. **Deliver Message:** Remove the completed flow and pass the message to the receiver. Repeat from step 1
36//!    until all flows are processed.
37//!
38//! _Messages between the same pair of peers remain strictly ordered. When one
39//! message finishes, the next message on that link may begin sending at
40//! `arrival_time - new_latency` so that its first byte arrives immediately after
41//! the previous one is fully received._
42//!
43//! ## Latency vs. Transmission Delay
44//!
45//! The simulation correctly distinguishes between two key components of message delivery:
46//!
47//! - **Transmission Delay:** The time it takes to send all bytes of a message over
48//!   the link. This is determined by the message size and the available bandwidth
49//!   (e.g., a 10KB message on a 10KB/s link has a 1-second transmission delay).
50//! - **Network Latency:** The time it takes for a byte to travel from the sender
51//!   to the receiver, independent of bandwidth. This is configured via the `Link`
52//!   properties.
53//!
54//! The final delivery time of a message is the sum of when its transmission completes
55//! plus the simulated network latency. This model ensures that large messages correctly
56//! occupy the network link for longer periods, affecting other concurrent transfers,
57//! while still accounting for the physical travel time of the data.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_p2p::simulated::{Config, Link, Network};
63//! use commonware_cryptography::{ed25519, PrivateKey, Signer as _, PublicKey as _, };
64//! use commonware_runtime::{deterministic, Metrics, Quota, Runner, Spawner, Supervisor};
65//! use commonware_utils::{NZU32, NZUsize};
66//! use std::time::Duration;
67//!
68//! // Generate peers
69//! let peers = vec![
70//!     ed25519::PrivateKey::from_seed(0).public_key(),
71//!     ed25519::PrivateKey::from_seed(1).public_key(),
72//!     ed25519::PrivateKey::from_seed(2).public_key(),
73//!     ed25519::PrivateKey::from_seed(3).public_key(),
74//! ];
75//!
76//! // Configure network
77//! let p2p_cfg = Config {
78//!     max_size: 1024 * 1024, // 1MB
79//!     disconnect_on_block: true,
80//!     tracked_peer_sets: NZUsize!(3),
81//! };
82//!
83//! // Rate limit quota (1000 messages per second per peer)
84//! let quota = Quota::per_second(NZU32!(1000));
85//!
86//! // Start context
87//! let executor = deterministic::Runner::seeded(0);
88//! executor.start(|context| async move {
89//!     // Initialize the network with an initial peer set (tracked at id 0).
90//!     let (network, oracle) =
91//!         Network::new_with_peers(context.child("network"), p2p_cfg, peers.clone())
92//!             .await;
93//!
94//!     // Start network
95//!     let network_handler = network.start();
96//!
97//!     let (sender1, receiver1) = oracle.control(peers[0].clone()).register(0, quota).await.unwrap();
98//!     let (sender2, receiver2) = oracle.control(peers[1].clone()).register(0, quota).await.unwrap();
99//!
100//!     // Set bandwidth limits
101//!     // peer[0]: 10KB/s egress, unlimited ingress
102//!     // peer[1]: unlimited egress, 5KB/s ingress
103//!     oracle.limit_bandwidth(peers[0].clone(), Some(10_000), None).await.unwrap();
104//!     oracle.limit_bandwidth(peers[1].clone(), None, Some(5_000)).await.unwrap();
105//!
106//!     // Link 2 peers
107//!     oracle.add_link(
108//!         peers[0].clone(),
109//!         peers[1].clone(),
110//!         Link {
111//!             latency: Duration::from_millis(5),
112//!             jitter: Duration::from_millis(2),
113//!             success_rate: 0.75,
114//!         },
115//!     ).await.unwrap();
116//!
117//!     // ... Use sender and receiver ...
118//!
119//!     // Update link
120//!     oracle.remove_link(
121//!         peers[0].clone(),
122//!         peers[1].clone(),
123//!     ).await.unwrap();
124//!     oracle.add_link(
125//!         peers[0].clone(),
126//!         peers[1].clone(),
127//!         Link {
128//!             latency: Duration::from_millis(100),
129//!             jitter: Duration::from_millis(25),
130//!             success_rate: 0.8,
131//!         },
132//!     ).await.unwrap();
133//!
134//!     // ... Use sender and receiver ...
135//!
136//!     // Shutdown network
137//!     network_handler.abort();
138//! });
139//! ```
140
141mod bandwidth;
142mod ingress;
143mod metrics;
144mod network;
145mod transmitter;
146
147use thiserror::Error;
148
149/// Errors that can occur when interacting with the network.
150#[derive(Debug, Error)]
151pub enum Error {
152    #[error("network closed")]
153    NetworkClosed,
154    #[error("not valid to link self")]
155    LinkingSelf,
156    #[error("link already exists")]
157    LinkExists,
158    #[error("link missing")]
159    LinkMissing,
160    #[error("invalid success rate (must be in [0, 1]): {0}")]
161    InvalidSuccessRate(f64),
162    #[error("send_frame failed")]
163    SendFrameFailed,
164    #[error("recv_frame failed")]
165    RecvFrameFailed,
166    #[error("bind failed")]
167    BindFailed,
168    #[error("accept failed")]
169    AcceptFailed,
170    #[error("dial failed")]
171    DialFailed,
172    #[error("peer missing")]
173    PeerMissing,
174}
175
176pub use ingress::{Control, Link, Manager, Oracle, SocketManager};
177pub use network::{
178    Config, ConnectedPeerProvider, Network, Receiver, Sender, SplitForwarder, SplitOrigin,
179    SplitRouter, SplitSender, SplitTarget, UnlimitedSender,
180};
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::{
186        Address, AddressableManager, AddressableTrackedPeers, CheckedSender as _, Ingress,
187        LimitedSender as _, Manager, Provider, Receiver, Recipients, Sender, TrackedPeers,
188    };
189    use commonware_cryptography::{
190        ed25519::{self, PrivateKey, PublicKey},
191        Signer as _,
192    };
193    use commonware_macros::{select, test_group};
194    use commonware_runtime::{
195        deterministic, reschedule, telemetry::metrics::count_running_tasks, Clock, IoBuf, Quota,
196        Runner, Spawner, Supervisor as _,
197    };
198    use commonware_utils::{
199        channel::mpsc,
200        hostname, ordered,
201        ordered::{Map, Set},
202        NZUsize, NZU32,
203    };
204    use rand::Rng;
205    use std::{
206        collections::{BTreeMap, HashMap, HashSet},
207        net::SocketAddr,
208        num::NonZeroU32,
209        time::Duration,
210    };
211
212    /// Default rate limit set high enough to not interfere with normal operation
213    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
214
215    async fn track_peers<I>(oracle: &Oracle<PublicKey, deterministic::Context>, peers: I)
216    where
217        I: IntoIterator<Item = PublicKey>,
218    {
219        let mut manager = oracle.manager();
220        manager.track(0, Set::from_iter_dedup(peers));
221        assert!(manager.peer_set(0).await.is_some());
222    }
223
224    async fn wait_for_task_count(
225        context: &deterministic::Context,
226        prefix: &str,
227        expected: impl Fn(usize) -> bool,
228    ) {
229        loop {
230            let count = count_running_tasks(context, prefix);
231            if expected(count) {
232                return;
233            }
234            reschedule().await;
235        }
236    }
237
238    fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
239        let executor = deterministic::Runner::seeded(seed);
240        executor.start(|context| async move {
241            // Create simulated network
242            let (network, oracle) = Network::new(
243                context.child("network"),
244                Config {
245                    max_size: 1024 * 1024,
246                    disconnect_on_block: true,
247                    tracked_peer_sets: NZUsize!(1),
248                },
249            );
250
251            // Start network
252            network.start();
253
254            // Register agents
255            let mut agents = BTreeMap::new();
256            let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
257            for i in 0..size {
258                let pk = PrivateKey::from_seed(i as u64).public_key();
259                let (sender, mut receiver) = oracle
260                    .control(pk.clone())
261                    .register(0, TEST_QUOTA)
262                    .await
263                    .unwrap();
264                agents.insert(pk, sender);
265                let agent_sender = seen_sender.clone();
266                context.child("agent_receiver").spawn(move |_| async move {
267                    for _ in 0..size {
268                        receiver.recv().await.unwrap();
269                    }
270                    agent_sender.send(i).await.unwrap();
271
272                    // Exiting early here tests the case where the recipient end of an agent is dropped
273                });
274            }
275            track_peers(&oracle, agents.keys().cloned()).await;
276
277            // Link all outbound-capable agents.
278            let only_inbound = PrivateKey::from_seed(0).public_key();
279            for agent in agents.keys() {
280                if agent == &only_inbound {
281                    // Leave this peer inbound-only to exercise missing-link handling.
282                    continue;
283                }
284                for other in agents.keys() {
285                    let result = oracle
286                        .add_link(
287                            agent.clone(),
288                            other.clone(),
289                            Link {
290                                latency: Duration::from_millis(5),
291                                jitter: Duration::from_millis(2),
292                                success_rate: 0.75,
293                            },
294                        )
295                        .await;
296                    if agent == other {
297                        assert!(matches!(result, Err(Error::LinkingSelf)));
298                    } else {
299                        assert!(result.is_ok());
300                    }
301                }
302            }
303
304            context
305                .child("agent_sender")
306                .spawn(|mut context| async move {
307                    // BTreeMap iteration gives deterministic sender selection.
308                    let keys = agents.keys().cloned().collect::<Vec<_>>();
309
310                    loop {
311                        let index = context.gen_range(0..keys.len());
312                        let sender = &keys[index];
313                        let msg = format!("hello from {sender:?}");
314                        let msg = IoBuf::copy_from_slice(msg.as_bytes());
315                        let message_sender = agents.get_mut(sender).unwrap();
316                        let sent = message_sender.send(Recipients::All, msg, false);
317                        assert_eq!(sent.len(), keys.len() - 1);
318                        reschedule().await;
319                    }
320                });
321
322            // Wait for all recipients
323            let mut results = Vec::new();
324            for _ in 0..size {
325                results.push(seen_receiver.recv().await.unwrap());
326            }
327            (context.auditor().state(), results)
328        })
329    }
330
331    fn compare_outputs(seeds: u64, size: usize) {
332        // Collect outputs
333        let mut outputs = Vec::new();
334        for seed in 0..seeds {
335            outputs.push(simulate_messages(seed, size));
336        }
337
338        // Confirm outputs are deterministic
339        for seed in 0..seeds {
340            let output = simulate_messages(seed, size);
341            assert_eq!(output, outputs[seed as usize]);
342        }
343    }
344
345    #[test_group("slow")]
346    #[test]
347    fn test_determinism() {
348        compare_outputs(25, 25);
349    }
350
351    #[test]
352    #[should_panic(expected = "message too large")]
353    fn test_message_too_big() {
354        let executor = deterministic::Runner::default();
355        executor.start(|mut context| async move {
356            // Create simulated network
357            let (network, oracle) = Network::new(
358                context.child("network"),
359                Config {
360                    max_size: 1024 * 1024,
361                    disconnect_on_block: true,
362                    tracked_peer_sets: NZUsize!(1),
363                },
364            );
365
366            // Start network
367            network.start();
368
369            // Register agents
370            let mut agents = HashMap::new();
371            for i in 0..10 {
372                let pk = PrivateKey::from_seed(i as u64).public_key();
373                let (sender, _) = oracle
374                    .control(pk.clone())
375                    .register(0, TEST_QUOTA)
376                    .await
377                    .unwrap();
378                agents.insert(pk, sender);
379            }
380
381            // Send invalid message
382            let keys = agents.keys().collect::<Vec<_>>();
383            let index = context.gen_range(0..keys.len());
384            let sender = keys[index];
385            let mut message_sender = agents.get(sender).unwrap().clone();
386            let mut msg = vec![0u8; 1024 * 1024 + 1];
387            context.fill(&mut msg[..]);
388            message_sender.send(Recipients::All, msg, false);
389        });
390    }
391
392    #[test]
393    fn test_linking_self() {
394        let executor = deterministic::Runner::default();
395        executor.start(|context| async move {
396            // Create simulated network
397            let (network, oracle) = Network::new(
398                context.child("network"),
399                Config {
400                    max_size: 1024 * 1024,
401                    disconnect_on_block: true,
402                    tracked_peer_sets: NZUsize!(1),
403                },
404            );
405
406            // Start network
407            network.start();
408
409            // Register agents
410            let pk = PrivateKey::from_seed(0).public_key();
411            oracle
412                .control(pk.clone())
413                .register(0, TEST_QUOTA)
414                .await
415                .unwrap();
416
417            // Attempt to link self
418            let result = oracle
419                .add_link(
420                    pk.clone(),
421                    pk,
422                    Link {
423                        latency: Duration::from_millis(5),
424                        jitter: Duration::from_millis(2),
425                        success_rate: 0.75,
426                    },
427                )
428                .await;
429
430            // Confirm error is correct
431            assert!(matches!(result, Err(Error::LinkingSelf)));
432        });
433    }
434
435    #[test]
436    fn test_duplicate_channel() {
437        let executor = deterministic::Runner::default();
438        executor.start(|context| async move {
439            // Create simulated network
440            let (network, oracle) = Network::new(
441                context.child("network"),
442                Config {
443                    max_size: 1024 * 1024,
444                    disconnect_on_block: true,
445                    tracked_peer_sets: NZUsize!(1),
446                },
447            );
448
449            // Start network
450            network.start();
451
452            // Setup links
453            let my_pk = PrivateKey::from_seed(0).public_key();
454            let other_pk = PrivateKey::from_seed(1).public_key();
455            oracle
456                .add_link(
457                    my_pk.clone(),
458                    other_pk.clone(),
459                    Link {
460                        latency: Duration::from_millis(10),
461                        jitter: Duration::from_millis(1),
462                        success_rate: 1.0,
463                    },
464                )
465                .await
466                .unwrap();
467            oracle
468                .add_link(
469                    other_pk.clone(),
470                    my_pk.clone(),
471                    Link {
472                        latency: Duration::from_millis(10),
473                        jitter: Duration::from_millis(1),
474                        success_rate: 1.0,
475                    },
476                )
477                .await
478                .unwrap();
479
480            // Register channels
481            let (mut my_sender, mut my_receiver) = oracle
482                .control(my_pk.clone())
483                .register(0, TEST_QUOTA)
484                .await
485                .unwrap();
486            let (mut other_sender, mut other_receiver) = oracle
487                .control(other_pk.clone())
488                .register(0, TEST_QUOTA)
489                .await
490                .unwrap();
491            track_peers(&oracle, [my_pk.clone(), other_pk.clone()]).await;
492
493            // Send messages
494            let msg = IoBuf::from(b"hello");
495            let sent = my_sender.send(Recipients::One(other_pk.clone()), msg.clone(), false);
496            assert_eq!(sent.len(), 1);
497            let (from, message) = other_receiver.recv().await.unwrap();
498            assert_eq!(from, my_pk);
499            assert_eq!(message, msg.clone());
500            let sent = other_sender.send(Recipients::One(my_pk.clone()), msg.clone(), false);
501            assert_eq!(sent.len(), 1);
502            let (from, message) = my_receiver.recv().await.unwrap();
503            assert_eq!(from, other_pk);
504            assert_eq!(message, msg);
505
506            // Update channel
507            let (mut my_sender_2, mut my_receiver_2) = oracle
508                .control(my_pk.clone())
509                .register(0, TEST_QUOTA)
510                .await
511                .unwrap();
512
513            // Send message
514            let msg = IoBuf::from(b"hello again");
515            let sent = my_sender_2.send(Recipients::One(other_pk.clone()), msg.clone(), false);
516            assert_eq!(sent.len(), 1);
517            let (from, message) = other_receiver.recv().await.unwrap();
518            assert_eq!(from, my_pk);
519            assert_eq!(message, msg.clone());
520            let sent = other_sender.send(Recipients::One(my_pk.clone()), msg.clone(), false);
521            assert_eq!(sent.len(), 1);
522            let (from, message) = my_receiver_2.recv().await.unwrap();
523            assert_eq!(from, other_pk);
524            assert_eq!(message, msg.clone());
525
526            // Listen on original
527            assert!(matches!(
528                my_receiver.recv().await,
529                Err(Error::NetworkClosed)
530            ));
531
532            // Send on original gracefully handles a closed channel.
533            my_sender.send(Recipients::One(other_pk.clone()), msg, false);
534        });
535    }
536
537    #[test]
538    fn test_invalid_success_rate() {
539        let executor = deterministic::Runner::default();
540        executor.start(|context| async move {
541            // Create simulated network
542            let (network, oracle) = Network::new(
543                context.child("network"),
544                Config {
545                    max_size: 1024 * 1024,
546                    disconnect_on_block: true,
547                    tracked_peer_sets: NZUsize!(1),
548                },
549            );
550
551            // Start network
552            network.start();
553
554            // Register agents
555            let pk1 = PrivateKey::from_seed(0).public_key();
556            let pk2 = PrivateKey::from_seed(1).public_key();
557            oracle
558                .control(pk1.clone())
559                .register(0, TEST_QUOTA)
560                .await
561                .unwrap();
562            oracle
563                .control(pk2.clone())
564                .register(0, TEST_QUOTA)
565                .await
566                .unwrap();
567
568            // Attempt to link with invalid success rate
569            let result = oracle
570                .add_link(
571                    pk1,
572                    pk2,
573                    Link {
574                        latency: Duration::from_millis(5),
575                        jitter: Duration::from_millis(2),
576                        success_rate: 1.5,
577                    },
578                )
579                .await;
580
581            // Confirm error is correct
582            assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
583        });
584    }
585
586    #[test]
587    fn test_add_link_before_channel_registration() {
588        let executor = deterministic::Runner::default();
589        executor.start(|context| async move {
590            // Create peers
591            let pk1 = PrivateKey::from_seed(0).public_key();
592            let pk2 = PrivateKey::from_seed(1).public_key();
593
594            // Create simulated network
595            let (network, oracle) = Network::new_with_peers(
596                context.child("network"),
597                Config {
598                    max_size: 1024 * 1024,
599                    disconnect_on_block: true,
600                    tracked_peer_sets: NZUsize!(3),
601                },
602                [pk1.clone(), pk2.clone()],
603            )
604            .await;
605            network.start();
606
607            // Add link
608            oracle
609                .add_link(
610                    pk1.clone(),
611                    pk2.clone(),
612                    Link {
613                        latency: Duration::ZERO,
614                        jitter: Duration::ZERO,
615                        success_rate: 1.0,
616                    },
617                )
618                .await
619                .unwrap();
620
621            // Register channels
622            let (mut sender1, _receiver1) = oracle
623                .control(pk1.clone())
624                .register(0, TEST_QUOTA)
625                .await
626                .unwrap();
627            let (_, mut receiver2) = oracle
628                .control(pk2.clone())
629                .register(0, TEST_QUOTA)
630                .await
631                .unwrap();
632
633            // Send message
634            let msg1 = IoBuf::from(b"link-before-register-1");
635            sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
636            let (from, received) = receiver2.recv().await.unwrap();
637            assert_eq!(from, pk1);
638            assert_eq!(received, msg1);
639        });
640    }
641
642    #[test]
643    fn test_simple_message_delivery() {
644        let executor = deterministic::Runner::default();
645        executor.start(|context| async move {
646            // Create simulated network
647            let (network, oracle) = Network::new(
648                context.child("network"),
649                Config {
650                    max_size: 1024 * 1024,
651                    disconnect_on_block: true,
652                    tracked_peer_sets: NZUsize!(1),
653                },
654            );
655
656            // Start network
657            network.start();
658
659            // Register agents
660            let pk1 = PrivateKey::from_seed(0).public_key();
661            let pk2 = PrivateKey::from_seed(1).public_key();
662            let (mut sender1, mut receiver1) = oracle
663                .control(pk1.clone())
664                .register(0, TEST_QUOTA)
665                .await
666                .unwrap();
667            let (mut sender2, mut receiver2) = oracle
668                .control(pk2.clone())
669                .register(0, TEST_QUOTA)
670                .await
671                .unwrap();
672
673            // Register unused channels
674            let _ = oracle
675                .control(pk1.clone())
676                .register(1, TEST_QUOTA)
677                .await
678                .unwrap();
679            let _ = oracle
680                .control(pk2.clone())
681                .register(2, TEST_QUOTA)
682                .await
683                .unwrap();
684            track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
685
686            // Link agents
687            oracle
688                .add_link(
689                    pk1.clone(),
690                    pk2.clone(),
691                    Link {
692                        latency: Duration::from_millis(5),
693                        jitter: Duration::from_millis(2),
694                        success_rate: 1.0,
695                    },
696                )
697                .await
698                .unwrap();
699            oracle
700                .add_link(
701                    pk2.clone(),
702                    pk1.clone(),
703                    Link {
704                        latency: Duration::from_millis(5),
705                        jitter: Duration::from_millis(2),
706                        success_rate: 1.0,
707                    },
708                )
709                .await
710                .unwrap();
711
712            // Send messages
713            let msg1 = IoBuf::from(b"hello from pk1");
714            let msg2 = IoBuf::from(b"hello from pk2");
715            sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
716            sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
717
718            // Confirm message delivery
719            let (sender, message) = receiver1.recv().await.unwrap();
720            assert_eq!(sender, pk2);
721            assert_eq!(message, msg2);
722            let (sender, message) = receiver2.recv().await.unwrap();
723            assert_eq!(sender, pk1);
724            assert_eq!(message, msg1);
725        });
726    }
727
728    #[test]
729    fn test_send_wrong_channel() {
730        let executor = deterministic::Runner::default();
731        executor.start(|context| async move {
732            // Create simulated network
733            let (network, oracle) = Network::new(
734                context.child("network"),
735                Config {
736                    max_size: 1024 * 1024,
737                    disconnect_on_block: true,
738                    tracked_peer_sets: NZUsize!(1),
739                },
740            );
741
742            // Start network
743            network.start();
744
745            // Register agents
746            let pk1 = PrivateKey::from_seed(0).public_key();
747            let pk2 = PrivateKey::from_seed(1).public_key();
748            let (mut sender1, _) = oracle
749                .control(pk1.clone())
750                .register(0, TEST_QUOTA)
751                .await
752                .unwrap();
753            let (_, mut receiver2) = oracle
754                .control(pk2.clone())
755                .register(1, TEST_QUOTA)
756                .await
757                .unwrap();
758            track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
759
760            // Link agents
761            oracle
762                .add_link(
763                    pk1,
764                    pk2.clone(),
765                    Link {
766                        latency: Duration::from_millis(5),
767                        jitter: Duration::ZERO,
768                        success_rate: 1.0,
769                    },
770                )
771                .await
772                .unwrap();
773
774            // Send message
775            let msg = IoBuf::from(b"hello from pk1");
776            sender1.send(Recipients::One(pk2), msg, false);
777
778            // Confirm no message delivery
779            select! {
780                _ = receiver2.recv() => {
781                    panic!("unexpected message");
782                },
783                _ = context.sleep(Duration::from_secs(1)) => {},
784            }
785        });
786    }
787
788    #[test]
789    fn test_dynamic_peers() {
790        let executor = deterministic::Runner::default();
791        executor.start(|context| async move {
792            // Create simulated network
793            let (network, oracle) = Network::new(
794                context.child("network"),
795                Config {
796                    max_size: 1024 * 1024,
797                    disconnect_on_block: true,
798                    tracked_peer_sets: NZUsize!(1),
799                },
800            );
801
802            // Start network
803            network.start();
804
805            // Define agents
806            let pk1 = PrivateKey::from_seed(0).public_key();
807            let pk2 = PrivateKey::from_seed(1).public_key();
808            let (mut sender1, mut receiver1) = oracle
809                .control(pk1.clone())
810                .register(0, TEST_QUOTA)
811                .await
812                .unwrap();
813            let (mut sender2, mut receiver2) = oracle
814                .control(pk2.clone())
815                .register(0, TEST_QUOTA)
816                .await
817                .unwrap();
818            track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
819
820            // Link agents
821            oracle
822                .add_link(
823                    pk1.clone(),
824                    pk2.clone(),
825                    Link {
826                        latency: Duration::from_millis(5),
827                        jitter: Duration::from_millis(2),
828                        success_rate: 1.0,
829                    },
830                )
831                .await
832                .unwrap();
833            oracle
834                .add_link(
835                    pk2.clone(),
836                    pk1.clone(),
837                    Link {
838                        latency: Duration::from_millis(5),
839                        jitter: Duration::from_millis(2),
840                        success_rate: 1.0,
841                    },
842                )
843                .await
844                .unwrap();
845
846            // Send messages
847            let msg1 = IoBuf::from(b"attempt 1: hello from pk1");
848            let msg2 = IoBuf::from(b"attempt 1: hello from pk2");
849            sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
850            sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
851
852            // Confirm message delivery
853            let (sender, message) = receiver1.recv().await.unwrap();
854            assert_eq!(sender, pk2);
855            assert_eq!(message, msg2);
856            let (sender, message) = receiver2.recv().await.unwrap();
857            assert_eq!(sender, pk1);
858            assert_eq!(message, msg1);
859        });
860    }
861
862    #[test]
863    fn test_dynamic_links() {
864        let executor = deterministic::Runner::default();
865        executor.start(|context| async move {
866            // Create simulated network
867            let (network, oracle) = Network::new(
868                context.child("network"),
869                Config {
870                    max_size: 1024 * 1024,
871                    disconnect_on_block: true,
872                    tracked_peer_sets: NZUsize!(1),
873                },
874            );
875
876            // Start network
877            network.start();
878
879            // Register agents
880            let pk1 = PrivateKey::from_seed(0).public_key();
881            let pk2 = PrivateKey::from_seed(1).public_key();
882            let (mut sender1, mut receiver1) = oracle
883                .control(pk1.clone())
884                .register(0, TEST_QUOTA)
885                .await
886                .unwrap();
887            let (mut sender2, mut receiver2) = oracle
888                .control(pk2.clone())
889                .register(0, TEST_QUOTA)
890                .await
891                .unwrap();
892            track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
893
894            // Send messages
895            let msg1 = IoBuf::from(b"attempt 1: hello from pk1");
896            let msg2 = IoBuf::from(b"attempt 1: hello from pk2");
897            sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
898            sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
899
900            // Confirm no message delivery
901            select! {
902                _ = receiver1.recv() => {
903                    panic!("unexpected message");
904                },
905                _ = receiver2.recv() => {
906                    panic!("unexpected message");
907                },
908                _ = context.sleep(Duration::from_secs(1)) => {},
909            }
910
911            // Link agents
912            oracle
913                .add_link(
914                    pk1.clone(),
915                    pk2.clone(),
916                    Link {
917                        latency: Duration::from_millis(5),
918                        jitter: Duration::from_millis(2),
919                        success_rate: 1.0,
920                    },
921                )
922                .await
923                .unwrap();
924            oracle
925                .add_link(
926                    pk2.clone(),
927                    pk1.clone(),
928                    Link {
929                        latency: Duration::from_millis(5),
930                        jitter: Duration::from_millis(2),
931                        success_rate: 1.0,
932                    },
933                )
934                .await
935                .unwrap();
936
937            // Send messages
938            let msg1 = IoBuf::from(b"attempt 2: hello from pk1");
939            let msg2 = IoBuf::from(b"attempt 2: hello from pk2");
940            sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
941            sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
942
943            // Confirm message delivery
944            let (sender, message) = receiver1.recv().await.unwrap();
945            assert_eq!(sender, pk2);
946            assert_eq!(message, msg2);
947            let (sender, message) = receiver2.recv().await.unwrap();
948            assert_eq!(sender, pk1);
949            assert_eq!(message, msg1);
950
951            // Remove links
952            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
953            oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
954
955            // Send messages
956            let msg1 = IoBuf::from(b"attempt 3: hello from pk1");
957            let msg2 = IoBuf::from(b"attempt 3: hello from pk2");
958            sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
959            sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
960
961            // Confirm no message delivery
962            select! {
963                _ = receiver1.recv() => {
964                    panic!("unexpected message");
965                },
966                _ = receiver2.recv() => {
967                    panic!("unexpected message");
968                },
969                _ = context.sleep(Duration::from_secs(1)) => {},
970            }
971
972            // Remove non-existent links
973            let result = oracle.remove_link(pk1, pk2).await;
974            assert!(matches!(result, Err(Error::LinkMissing)));
975        });
976    }
977
978    async fn test_bandwidth_between_peers(
979        context: &mut deterministic::Context,
980        oracle: &Oracle<PublicKey, deterministic::Context>,
981        index: u64,
982        sender_bps: Option<usize>,
983        receiver_bps: Option<usize>,
984        message_size: usize,
985        expected_duration_ms: u64,
986    ) {
987        // Create two agents
988        let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
989        let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
990        let (mut sender, _) = oracle
991            .control(pk1.clone())
992            .register(0, TEST_QUOTA)
993            .await
994            .unwrap();
995        let (_, mut receiver) = oracle
996            .control(pk2.clone())
997            .register(0, TEST_QUOTA)
998            .await
999            .unwrap();
1000        let mut manager = oracle.manager();
1001        manager.track(index, Set::from_iter_dedup([pk1.clone(), pk2.clone()]));
1002
1003        // Set bandwidth limits
1004        oracle
1005            .limit_bandwidth(pk1.clone(), sender_bps, None)
1006            .await
1007            .unwrap();
1008        oracle
1009            .limit_bandwidth(pk2.clone(), None, receiver_bps)
1010            .await
1011            .unwrap();
1012
1013        // Link the two agents
1014        oracle
1015            .add_link(
1016                pk1.clone(),
1017                pk2.clone(),
1018                Link {
1019                    // No latency so it doesn't interfere with bandwidth delay calculation
1020                    latency: Duration::ZERO,
1021                    jitter: Duration::ZERO,
1022                    success_rate: 1.0,
1023                },
1024            )
1025            .await
1026            .unwrap();
1027
1028        // Send a message from agent 1 to 2
1029        let msg = IoBuf::from(vec![42u8; message_size]);
1030        let start = context.current();
1031        sender.send(Recipients::One(pk2.clone()), msg.clone(), true);
1032
1033        // Measure how long it takes for agent 2 to receive the message
1034        let (origin, received) = receiver.recv().await.unwrap();
1035        let elapsed = context.current().duration_since(start).unwrap();
1036
1037        assert_eq!(origin, pk1);
1038        assert_eq!(received, msg);
1039        assert!(
1040            elapsed >= Duration::from_millis(expected_duration_ms),
1041            "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
1042        );
1043        assert!(
1044            elapsed < Duration::from_millis(expected_duration_ms + 100),
1045            "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
1046        );
1047    }
1048
1049    #[test]
1050    fn test_bandwidth() {
1051        let executor = deterministic::Runner::default();
1052        executor.start(|mut context| async move {
1053            let (network, oracle) = Network::new(
1054                context.child("network"),
1055                Config {
1056                    max_size: 1024 * 1024,
1057                    disconnect_on_block: true,
1058                    tracked_peer_sets: NZUsize!(1),
1059                },
1060            );
1061            network.start();
1062
1063            // Both sender and receiver have the same bandiwdth (1000 B/s)
1064            // 500 bytes at 1000 B/s = 0.5 seconds
1065            test_bandwidth_between_peers(
1066                &mut context,
1067                &oracle,
1068                0,
1069                Some(1000), // sender egress
1070                Some(1000), // receiver ingress
1071                500,        // message size
1072                500,        // expected duration in ms
1073            )
1074            .await;
1075
1076            // Sender has lower bandwidth (500 B/s) than receiver (2000 B/s)
1077            // Should be limited by sender's 500 B/s
1078            // 250 bytes at 500 B/s = 0.5 seconds
1079            test_bandwidth_between_peers(
1080                &mut context,
1081                &oracle,
1082                1,
1083                Some(500),  // sender egress
1084                Some(2000), // receiver ingress
1085                250,        // message size
1086                500,        // expected duration in ms
1087            )
1088            .await;
1089
1090            // Sender has higher bandwidth (2000 B/s) than receiver (500 B/s)
1091            // Should be limited by receiver's 500 B/s
1092            // 250 bytes at 500 B/s = 0.5 seconds
1093            test_bandwidth_between_peers(
1094                &mut context,
1095                &oracle,
1096                2,
1097                Some(2000), // sender egress
1098                Some(500),  // receiver ingress
1099                250,        // message size
1100                500,        // expected duration in ms
1101            )
1102            .await;
1103
1104            // Unlimited sender, limited receiver
1105            // Should be limited by receiver's 1000 B/s
1106            // 500 bytes at 1000 B/s = 0.5 seconds
1107            test_bandwidth_between_peers(
1108                &mut context,
1109                &oracle,
1110                3,
1111                None,       // sender egress (unlimited)
1112                Some(1000), // receiver ingress
1113                500,        // message size
1114                500,        // expected duration in ms
1115            )
1116            .await;
1117
1118            // Limited sender, unlimited receiver
1119            // Should be limited by sender's 1000 B/s
1120            // 500 bytes at 1000 B/s = 0.5 seconds
1121            test_bandwidth_between_peers(
1122                &mut context,
1123                &oracle,
1124                4,
1125                Some(1000), // sender egress
1126                None,       // receiver ingress (unlimited)
1127                500,        // message size
1128                500,        // expected duration in ms
1129            )
1130            .await;
1131
1132            // Unlimited sender, unlimited receiver
1133            // Delivery should be (almost) instant
1134            test_bandwidth_between_peers(
1135                &mut context,
1136                &oracle,
1137                5,
1138                None, // sender egress (unlimited)
1139                None, // receiver ingress (unlimited)
1140                500,  // message size
1141                0,    // expected duration in ms
1142            )
1143            .await;
1144        });
1145    }
1146
1147    #[test]
1148    fn test_bandwidth_contention() {
1149        // Test bandwidth contention with many peers (one-to-many and many-to-one scenarios)
1150        let executor = deterministic::Runner::default();
1151        executor.start(|context| async move {
1152            let (network, oracle) = Network::new(
1153                context.child("network"),
1154                Config {
1155                    max_size: 1024 * 1024,
1156                    disconnect_on_block: true,
1157                    tracked_peer_sets: NZUsize!(1),
1158                },
1159            );
1160            network.start();
1161
1162            // Configuration
1163            const NUM_PEERS: usize = 100;
1164            const MESSAGE_SIZE: usize = 1000; // 1KB per message
1165            const EFFECTIVE_BPS: usize = 10_000; // 10KB/s egress/ingress per peer
1166
1167            // Create peers
1168            let mut peers = Vec::with_capacity(NUM_PEERS + 1);
1169            let mut senders = Vec::with_capacity(NUM_PEERS + 1);
1170            let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
1171
1172            // Create the main peer (index 0) and 100 other peers
1173            for i in 0..=NUM_PEERS {
1174                let pk = PrivateKey::from_seed(i as u64).public_key();
1175                let (sender, receiver) = oracle
1176                    .control(pk.clone())
1177                    .register(0, TEST_QUOTA)
1178                    .await
1179                    .unwrap();
1180                peers.push(pk);
1181                senders.push(sender);
1182                receivers.push(receiver);
1183            }
1184            track_peers(&oracle, peers.iter().cloned()).await;
1185
1186            // Set bandwidth limits for all peers
1187            for pk in &peers {
1188                oracle
1189                    .limit_bandwidth(pk.clone(), Some(EFFECTIVE_BPS), Some(EFFECTIVE_BPS))
1190                    .await
1191                    .unwrap();
1192            }
1193
1194            // Link all peers to the main peer (peers[0]) with zero latency
1195            for peer in peers.iter().skip(1) {
1196                oracle
1197                    .add_link(
1198                        peer.clone(),
1199                        peers[0].clone(),
1200                        Link {
1201                            latency: Duration::ZERO,
1202                            jitter: Duration::ZERO,
1203                            success_rate: 1.0,
1204                        },
1205                    )
1206                    .await
1207                    .unwrap();
1208                oracle
1209                    .add_link(
1210                        peers[0].clone(),
1211                        peer.clone(),
1212                        Link {
1213                            latency: Duration::ZERO,
1214                            jitter: Duration::ZERO,
1215                            success_rate: 1.0,
1216                        },
1217                    )
1218                    .await
1219                    .unwrap();
1220            }
1221
1222            // One-to-many (main peer sends to all others). Verifies that bandwidth limits
1223            // are properly enforced when sending to multiple recipients
1224            let start = context.current();
1225
1226            // Send message to all peers concurrently
1227            // and wait for all sends to be acknowledged
1228            let msg = IoBuf::from(vec![0u8; MESSAGE_SIZE]);
1229            for peer in peers.iter().skip(1) {
1230                senders[0].send(Recipients::One(peer.clone()), msg.clone(), true);
1231            }
1232
1233            // Verify all messages are received
1234            for receiver in receivers.iter_mut().skip(1) {
1235                let (origin, received) = receiver.recv().await.unwrap();
1236                assert_eq!(origin, peers[0]);
1237                assert_eq!(received.len(), MESSAGE_SIZE);
1238            }
1239
1240            let elapsed = context.current().duration_since(start).unwrap();
1241
1242            // Calculate expected time
1243            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1244
1245            assert!(
1246                elapsed >= Duration::from_millis(expected_ms as u64),
1247                "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1248            );
1249            assert!(
1250                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1251                "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1252            );
1253
1254            // Many-to-one (all peers send to the main peer)
1255            let start = context.current();
1256
1257            // Each peer sends a message to the main peer concurrently and we wait for all
1258            // sends to be acknowledged
1259            let msg = IoBuf::from(vec![0; MESSAGE_SIZE]);
1260            for mut sender in senders.into_iter().skip(1) {
1261                sender.send(Recipients::One(peers[0].clone()), msg.clone(), true);
1262            }
1263
1264            // Collect all messages at the main peer
1265            let mut received_from = HashSet::new();
1266            for _ in 1..=NUM_PEERS {
1267                let (origin, received) = receivers[0].recv().await.unwrap();
1268                assert_eq!(received.len(), MESSAGE_SIZE);
1269                assert!(
1270                    received_from.insert(origin.clone()),
1271                    "Received duplicate from {origin:?}"
1272                );
1273            }
1274
1275            let elapsed = context.current().duration_since(start).unwrap();
1276
1277            // Calculate expected time
1278            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1279
1280            assert!(
1281                elapsed >= Duration::from_millis(expected_ms as u64),
1282                "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1283            );
1284            assert!(
1285                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1286                "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1287            );
1288
1289            // Verify we received from all peers
1290            assert_eq!(received_from.len(), NUM_PEERS);
1291            for peer in peers.iter().skip(1) {
1292                assert!(received_from.contains(peer));
1293            }
1294        });
1295    }
1296
1297    #[test]
1298    fn test_message_ordering() {
1299        // Test that messages arrive in order even with variable latency
1300        let executor = deterministic::Runner::default();
1301        executor.start(|context| async move {
1302            let (network, oracle) = Network::new(
1303                context.child("network"),
1304                Config {
1305                    max_size: 1024 * 1024,
1306                    disconnect_on_block: true,
1307                    tracked_peer_sets: NZUsize!(1),
1308                },
1309            );
1310            network.start();
1311
1312            // Register agents
1313            let pk1 = PrivateKey::from_seed(1).public_key();
1314            let pk2 = PrivateKey::from_seed(2).public_key();
1315            let (mut sender, _) = oracle
1316                .control(pk1.clone())
1317                .register(0, TEST_QUOTA)
1318                .await
1319                .unwrap();
1320            let (_, mut receiver) = oracle
1321                .control(pk2.clone())
1322                .register(0, TEST_QUOTA)
1323                .await
1324                .unwrap();
1325            track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
1326
1327            // Link agents with high jitter to create variable delays
1328            oracle
1329                .add_link(
1330                    pk1.clone(),
1331                    pk2.clone(),
1332                    Link {
1333                        latency: Duration::from_millis(50),
1334                        jitter: Duration::from_millis(40),
1335                        success_rate: 1.0,
1336                    },
1337                )
1338                .await
1339                .unwrap();
1340
1341            // Send multiple messages that should arrive in order
1342            let messages = vec![
1343                IoBuf::from(b"message 1"),
1344                IoBuf::from(b"message 2"),
1345                IoBuf::from(b"message 3"),
1346                IoBuf::from(b"message 4"),
1347                IoBuf::from(b"message 5"),
1348            ];
1349
1350            for msg in messages.clone() {
1351                sender.send(Recipients::One(pk2.clone()), msg, true);
1352            }
1353
1354            // Receive messages and verify they arrive in order
1355            for expected_msg in messages {
1356                let (origin, received_msg) = receiver.recv().await.unwrap();
1357                assert_eq!(origin, pk1);
1358                assert_eq!(received_msg, expected_msg);
1359            }
1360        })
1361    }
1362
1363    #[test]
1364    fn test_high_latency_message_blocks_followup() {
1365        let executor = deterministic::Runner::default();
1366        executor.start(|context| async move {
1367            let (network, oracle) = Network::new(
1368                context.child("network"),
1369                Config {
1370                    max_size: 1024 * 1024,
1371                    disconnect_on_block: true,
1372                    tracked_peer_sets: NZUsize!(1),
1373                },
1374            );
1375            network.start();
1376
1377            let pk1 = PrivateKey::from_seed(1).public_key();
1378            let pk2 = PrivateKey::from_seed(2).public_key();
1379            let (mut sender, _) = oracle.control(pk1.clone()).register(0, TEST_QUOTA).await.unwrap();
1380            let (_, mut receiver) = oracle.control(pk2.clone()).register(0, TEST_QUOTA).await.unwrap();
1381            track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
1382
1383            const BPS: usize = 1_000;
1384            oracle
1385                .limit_bandwidth(pk1.clone(), Some(BPS), None)
1386                .await
1387                .unwrap();
1388            oracle
1389                .limit_bandwidth(pk2.clone(), None, Some(BPS))
1390                .await
1391                .unwrap();
1392
1393            // Send slow message
1394            oracle
1395                .add_link(
1396                    pk1.clone(),
1397                    pk2.clone(),
1398                    Link {
1399                        latency: Duration::from_millis(5_000),
1400                        jitter: Duration::ZERO,
1401                        success_rate: 1.0,
1402                    },
1403                )
1404                .await
1405                .unwrap();
1406
1407            let egress_time = Duration::from_secs(1);
1408            let start = context.current();
1409            let slow = IoBuf::from(vec![0u8; 1_000]);
1410            sender
1411                .send(Recipients::One(pk2.clone()), slow.clone(), true);
1412
1413            // Update link
1414            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
1415            oracle
1416                .add_link(
1417                    pk1.clone(),
1418                    pk2.clone(),
1419                    Link {
1420                        latency: Duration::from_millis(1),
1421                        jitter: Duration::ZERO,
1422                        success_rate: 1.0,
1423                    },
1424                )
1425                .await
1426                .unwrap();
1427
1428            // Send fast message
1429            let fast = IoBuf::from(vec![1u8; 1_000]);
1430            sender
1431                .send(Recipients::One(pk2.clone()), fast.clone(), true);
1432
1433            let (origin1, message1) = receiver.recv().await.unwrap();
1434            assert_eq!(origin1, pk1);
1435            assert_eq!(message1, slow);
1436            let first_elapsed = context.current().duration_since(start).unwrap();
1437
1438            let (origin2, message2) = receiver.recv().await.unwrap();
1439            let second_elapsed = context.current().duration_since(start).unwrap();
1440            assert_eq!(origin2, pk1);
1441            assert_eq!(message2, fast);
1442
1443            let slow_latency = Duration::from_millis(5_000);
1444            let expected_first = egress_time + slow_latency;
1445            let tolerance = Duration::from_millis(10);
1446            assert!(
1447                first_elapsed >= expected_first.saturating_sub(tolerance)
1448                    && first_elapsed <= expected_first + tolerance,
1449                "slow message arrived outside expected window: {first_elapsed:?} (expected {expected_first:?} ± {tolerance:?})"
1450            );
1451            assert!(
1452                second_elapsed >= first_elapsed,
1453                "fast message arrived before slow transmission completed"
1454            );
1455
1456            let arrival_gap = second_elapsed
1457                .checked_sub(first_elapsed)
1458                .expect("timestamps ordered");
1459            assert!(
1460                arrival_gap >= egress_time.saturating_sub(tolerance)
1461                    && arrival_gap <= egress_time + tolerance,
1462                "next arrival deviated from transmit duration (gap = {arrival_gap:?}, expected {egress_time:?} ± {tolerance:?})"
1463            );
1464        })
1465    }
1466
1467    #[test]
1468    fn test_many_to_one_bandwidth_sharing() {
1469        let executor = deterministic::Runner::default();
1470        executor.start(|context| async move {
1471            let (network, oracle) = Network::new(
1472                context.child("network"),
1473                Config {
1474                    max_size: 1024 * 1024,
1475                    disconnect_on_block: true,
1476                    tracked_peer_sets: NZUsize!(1),
1477                },
1478            );
1479            network.start();
1480
1481            // Create 10 senders and 1 receiver
1482            let mut senders = Vec::new();
1483            let mut sender_txs = Vec::new();
1484            for i in 0..10 {
1485                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1486                senders.push(sender.clone());
1487                let (tx, _) = oracle
1488                    .control(sender.clone())
1489                    .register(0, TEST_QUOTA)
1490                    .await
1491                    .unwrap();
1492                sender_txs.push(tx);
1493
1494                // Each sender has 10KB/s egress
1495                oracle
1496                    .limit_bandwidth(sender.clone(), Some(10_000), None)
1497                    .await
1498                    .unwrap();
1499            }
1500
1501            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1502            let (_, mut receiver_rx) = oracle
1503                .control(receiver.clone())
1504                .register(0, TEST_QUOTA)
1505                .await
1506                .unwrap();
1507            track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1508
1509            // Receiver has 100KB/s ingress
1510            oracle
1511                .limit_bandwidth(receiver.clone(), None, Some(100_000))
1512                .await
1513                .unwrap();
1514
1515            // Add links with no latency
1516            for sender in &senders {
1517                oracle
1518                    .add_link(
1519                        sender.clone(),
1520                        receiver.clone(),
1521                        Link {
1522                            latency: Duration::ZERO,
1523                            jitter: Duration::ZERO,
1524                            success_rate: 1.0,
1525                        },
1526                    )
1527                    .await
1528                    .unwrap();
1529            }
1530
1531            let start = context.current();
1532
1533            // All senders send 10KB simultaneously
1534            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1535                let receiver_clone = receiver.clone();
1536                let msg = IoBuf::from(vec![i as u8; 10_000]);
1537                tx.send(Recipients::One(receiver_clone), msg, true);
1538            }
1539
1540            // All 10 messages should be received at ~1s
1541            // (100KB total data at 100KB/s aggregate bandwidth)
1542            for i in 0..10 {
1543                let (_, _msg) = receiver_rx.recv().await.unwrap();
1544                let recv_time = context.current().duration_since(start).unwrap();
1545
1546                // Messages should all complete around 1s
1547                assert!(
1548                    recv_time >= Duration::from_millis(950)
1549                        && recv_time <= Duration::from_millis(1100),
1550                    "Message {i} received at {recv_time:?}, expected ~1s",
1551                );
1552            }
1553        });
1554    }
1555
1556    #[test]
1557    fn test_one_to_many_fast_sender() {
1558        // Test that 1 fast sender (100KB/s) sending to 10 receivers (10KB/s each)
1559        // should complete all sends in ~1s and all messages received in ~1s
1560        let executor = deterministic::Runner::default();
1561        executor.start(|context| async move {
1562            let (network, oracle) = Network::new(
1563                context.child("network"),
1564                Config {
1565                    max_size: 1024 * 1024,
1566                    disconnect_on_block: true,
1567                    tracked_peer_sets: NZUsize!(1),
1568                },
1569            );
1570            network.start();
1571
1572            // Create fast sender
1573            let sender = ed25519::PrivateKey::from_seed(0).public_key();
1574            let (mut sender_tx, _) = oracle
1575                .control(sender.clone())
1576                .register(0, TEST_QUOTA)
1577                .await
1578                .unwrap();
1579
1580            // Sender has 100KB/s egress
1581            oracle
1582                .limit_bandwidth(sender.clone(), Some(100_000), None)
1583                .await
1584                .unwrap();
1585
1586            // Create 10 receivers
1587            let mut receivers = Vec::new();
1588            let mut receiver_rxs = Vec::new();
1589            for i in 0..10 {
1590                let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1591                receivers.push(receiver.clone());
1592                let (_, rx) = oracle
1593                    .control(receiver.clone())
1594                    .register(0, TEST_QUOTA)
1595                    .await
1596                    .unwrap();
1597                receiver_rxs.push(rx);
1598
1599                // Each receiver has 10KB/s ingress
1600                oracle
1601                    .limit_bandwidth(receiver.clone(), None, Some(10_000))
1602                    .await
1603                    .unwrap();
1604
1605                // Add link with no latency
1606                oracle
1607                    .add_link(
1608                        sender.clone(),
1609                        receiver.clone(),
1610                        Link {
1611                            latency: Duration::ZERO,
1612                            jitter: Duration::ZERO,
1613                            success_rate: 1.0,
1614                        },
1615                    )
1616                    .await
1617                    .unwrap();
1618            }
1619            track_peers(
1620                &oracle,
1621                core::iter::once(sender.clone()).chain(receivers.iter().cloned()),
1622            )
1623            .await;
1624
1625            let start = context.current();
1626
1627            // Send 10KB to each receiver (100KB total)
1628            for (i, receiver) in receivers.iter().enumerate() {
1629                let msg = IoBuf::from(vec![i as u8; 10_000]);
1630                sender_tx.send(Recipients::One(receiver.clone()), msg, true);
1631            }
1632
1633            // Each receiver should receive their 10KB message in ~1s (10KB at 10KB/s)
1634            for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1635                let (_, msg) = rx.recv().await.unwrap();
1636                assert_eq!(msg.as_ref()[0], i as u8);
1637                let recv_time = context.current().duration_since(start).unwrap();
1638
1639                // All messages should be received around 1s
1640                assert!(
1641                    recv_time >= Duration::from_millis(950)
1642                        && recv_time <= Duration::from_millis(1100),
1643                    "Receiver {i} received at {recv_time:?}, expected ~1s",
1644                );
1645            }
1646        });
1647    }
1648
1649    #[test]
1650    fn test_many_slow_senders_to_fast_receiver() {
1651        // Test that 10 slow senders (1KB/s each) sending to a fast receiver (10KB/s)
1652        // should complete all transfers in ~1s
1653        let executor = deterministic::Runner::default();
1654        executor.start(|context| async move {
1655            let (network, oracle) = Network::new(
1656                context.child("network"),
1657                Config {
1658                    max_size: 1024 * 1024,
1659                    disconnect_on_block: true,
1660                    tracked_peer_sets: NZUsize!(1),
1661                },
1662            );
1663            network.start();
1664
1665            // Create 10 slow senders
1666            let mut senders = Vec::new();
1667            let mut sender_txs = Vec::new();
1668            for i in 0..10 {
1669                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1670                senders.push(sender.clone());
1671                let (tx, _) = oracle
1672                    .control(sender.clone())
1673                    .register(0, TEST_QUOTA)
1674                    .await
1675                    .unwrap();
1676                sender_txs.push(tx);
1677
1678                // Each sender has 1KB/s egress (slow)
1679                oracle
1680                    .limit_bandwidth(sender.clone(), Some(1_000), None)
1681                    .await
1682                    .unwrap();
1683            }
1684
1685            // Create fast receiver
1686            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1687            let (_, mut receiver_rx) = oracle
1688                .control(receiver.clone())
1689                .register(0, TEST_QUOTA)
1690                .await
1691                .unwrap();
1692            track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1693
1694            // Receiver has 10KB/s ingress (can handle all 10 senders at full speed)
1695            oracle
1696                .limit_bandwidth(receiver.clone(), None, Some(10_000))
1697                .await
1698                .unwrap();
1699
1700            // Add links with no latency
1701            for sender in &senders {
1702                oracle
1703                    .add_link(
1704                        sender.clone(),
1705                        receiver.clone(),
1706                        Link {
1707                            latency: Duration::ZERO,
1708                            jitter: Duration::ZERO,
1709                            success_rate: 1.0,
1710                        },
1711                    )
1712                    .await
1713                    .unwrap();
1714            }
1715
1716            let start = context.current();
1717
1718            // All senders send 1KB simultaneously
1719            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1720                let receiver_clone = receiver.clone();
1721                let msg = IoBuf::from(vec![i as u8; 1_000]);
1722                tx.send(Recipients::One(receiver_clone), msg, true);
1723            }
1724
1725            // Each sender takes 1s to transmit 1KB at 1KB/s
1726            // All transmissions happen in parallel, so total send time is ~1s
1727
1728            // All 10 messages (10KB total) should be received at ~1s
1729            // Receiver processes at 10KB/s, can handle all 10KB in 1s
1730            for i in 0..10 {
1731                let (_, _msg) = receiver_rx.recv().await.unwrap();
1732                let recv_time = context.current().duration_since(start).unwrap();
1733
1734                // All messages should complete around 1s
1735                assert!(
1736                    recv_time >= Duration::from_millis(950)
1737                        && recv_time <= Duration::from_millis(1100),
1738                    "Message {i} received at {recv_time:?}, expected ~1s",
1739                );
1740            }
1741        });
1742    }
1743
1744    #[test]
1745    fn test_dynamic_bandwidth_allocation_staggered() {
1746        // Test that bandwidth is dynamically allocated as
1747        // transfers start and complete at different times
1748        //
1749        // 3 senders to 1 receiver, starting at different times
1750        // Receiver has 30KB/s, senders each have 30KB/s
1751        let executor = deterministic::Runner::default();
1752        executor.start(|context| async move {
1753            let (network, oracle) = Network::new(
1754                context.child("network"),
1755                Config {
1756                    max_size: 1024 * 1024,
1757                    disconnect_on_block: true,
1758                    tracked_peer_sets: NZUsize!(1),
1759                },
1760            );
1761            network.start();
1762
1763            // Create 3 senders
1764            let mut senders = Vec::new();
1765            let mut sender_txs = Vec::new();
1766            for i in 0..3 {
1767                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1768                senders.push(sender.clone());
1769                let (tx, _) = oracle
1770                    .control(sender.clone())
1771                    .register(0, TEST_QUOTA)
1772                    .await
1773                    .unwrap();
1774                sender_txs.push(tx);
1775
1776                // Each sender has 30KB/s egress
1777                oracle
1778                    .limit_bandwidth(sender.clone(), Some(30_000), None)
1779                    .await
1780                    .unwrap();
1781            }
1782
1783            // Create receiver with 30KB/s ingress
1784            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1785            let (_, mut receiver_rx) = oracle
1786                .control(receiver.clone())
1787                .register(0, TEST_QUOTA)
1788                .await
1789                .unwrap();
1790            track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1791            oracle
1792                .limit_bandwidth(receiver.clone(), None, Some(30_000))
1793                .await
1794                .unwrap();
1795
1796            // Add links with minimal latency
1797            for sender in &senders {
1798                oracle
1799                    .add_link(
1800                        sender.clone(),
1801                        receiver.clone(),
1802                        Link {
1803                            latency: Duration::from_millis(1),
1804                            jitter: Duration::ZERO,
1805                            success_rate: 1.0,
1806                        },
1807                    )
1808                    .await
1809                    .unwrap();
1810            }
1811
1812            let start = context.current();
1813            let mut sender_txs = sender_txs.into_iter();
1814
1815            // Sender 0: sends 30KB at t=0
1816            // Gets full 30KB/s for the first 0.5s, then shares with sender 1
1817            // at 15KB/s until completion at t=1.5s
1818            let mut tx0 = sender_txs.next().expect("missing sender 0");
1819            let rx_clone = receiver.clone();
1820            context.child("task").spawn(move |_| async move {
1821                let msg = IoBuf::from(vec![0u8; 30_000]);
1822                tx0.send(Recipients::One(rx_clone), msg, true);
1823            });
1824
1825            // Sender 1: sends 30KB at t=0.5s
1826            // Shares bandwidth with sender 0 (15KB/s each) until t=1.5s,
1827            // then gets the full 30KB/s
1828            let mut tx1 = sender_txs.next().expect("missing sender 1");
1829            let rx_clone = receiver.clone();
1830            context.child("task").spawn(move |context| async move {
1831                context.sleep(Duration::from_millis(500)).await;
1832                let msg = IoBuf::from(vec![1u8; 30_000]);
1833                tx1.send(Recipients::One(rx_clone), msg, true);
1834            });
1835
1836            // Sender 2: sends 15KB at t=1.5s and shares the receiver with
1837            // sender 1, completing at roughly t=2.5s
1838            let mut tx2 = sender_txs.next().expect("missing sender 2");
1839            let rx_clone = receiver.clone();
1840            context.child("task").spawn(move |context| async move {
1841                context.sleep(Duration::from_millis(1500)).await;
1842                let msg = IoBuf::from(vec![2u8; 15_000]);
1843                tx2.send(Recipients::One(rx_clone), msg, true);
1844            });
1845
1846            // Receive and verify timing
1847            // Message 0: starts at t=0, shares bandwidth after 0.5s,
1848            // and completes at t=1.5s (plus link latency)
1849            let (_, msg0) = receiver_rx.recv().await.unwrap();
1850            assert_eq!(msg0.as_ref()[0], 0);
1851            let t0 = context.current().duration_since(start).unwrap();
1852            assert!(
1853                t0 >= Duration::from_millis(1490) && t0 <= Duration::from_millis(1600),
1854                "Message 0 received at {t0:?}, expected ~1.5s",
1855            );
1856
1857            // The algorithm may deliver messages in a different order based on
1858            // efficient bandwidth usage. Let's collect the next two messages and
1859            // verify their timings regardless of order.
1860            let (_, msg_a) = receiver_rx.recv().await.unwrap();
1861            let t_a = context.current().duration_since(start).unwrap();
1862
1863            let (_, msg_b) = receiver_rx.recv().await.unwrap();
1864            let t_b = context.current().duration_since(start).unwrap();
1865
1866            // Figure out which message is which based on content
1867            let (msg1, t1, msg2, t2) = if msg_a.as_ref()[0] == 1 {
1868                (msg_a, t_a, msg_b, t_b)
1869            } else {
1870                (msg_b, t_b, msg_a, t_a)
1871            };
1872
1873            assert_eq!(msg1.as_ref()[0], 1);
1874            assert_eq!(msg2.as_ref()[0], 2);
1875
1876            // Message 1 (30KB) started at t=0.5s
1877            // Message 2 (15KB) started at t=1.5s
1878            // With efficient scheduling, message 2 might complete first since it's smaller
1879            // Both should complete between 1.5s and 2.5s
1880            assert!(
1881                t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1882                "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1883            );
1884
1885            assert!(
1886                t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1887                "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1888            );
1889        });
1890    }
1891
1892    #[test]
1893    fn test_dynamic_bandwidth_varied_sizes() {
1894        // Test dynamic allocation with different message sizes arriving simultaneously
1895        // This tests that smaller messages complete first when bandwidth is shared
1896        let executor = deterministic::Runner::default();
1897        executor.start(|context| async move {
1898            let (network, oracle) = Network::new(
1899                context.child("network"),
1900                Config {
1901                    max_size: 1024 * 1024,
1902                    disconnect_on_block: true,
1903                    tracked_peer_sets: NZUsize!(1),
1904                },
1905            );
1906            network.start();
1907
1908            // Create 3 senders
1909            let mut senders = Vec::new();
1910            let mut sender_txs = Vec::new();
1911            for i in 0..3 {
1912                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1913                senders.push(sender.clone());
1914                let (tx, _) = oracle
1915                    .control(sender.clone())
1916                    .register(0, TEST_QUOTA)
1917                    .await
1918                    .unwrap();
1919                sender_txs.push(tx);
1920
1921                // Each sender has unlimited egress
1922                oracle
1923                    .limit_bandwidth(sender.clone(), None, None)
1924                    .await
1925                    .unwrap();
1926            }
1927
1928            // Create receiver with 30KB/s ingress
1929            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1930            let (_, mut receiver_rx) = oracle
1931                .control(receiver.clone())
1932                .register(0, TEST_QUOTA)
1933                .await
1934                .unwrap();
1935            track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1936            oracle
1937                .limit_bandwidth(receiver.clone(), None, Some(30_000))
1938                .await
1939                .unwrap();
1940
1941            // Add links
1942            for sender in &senders {
1943                oracle
1944                    .add_link(
1945                        sender.clone(),
1946                        receiver.clone(),
1947                        Link {
1948                            latency: Duration::from_millis(1),
1949                            jitter: Duration::ZERO,
1950                            success_rate: 1.0,
1951                        },
1952                    )
1953                    .await
1954                    .unwrap();
1955            }
1956
1957            let start = context.current();
1958
1959            // All start at the same time but with different sizes
1960            //
1961            // The scheduler reserves bandwidth in advance, the actual behavior
1962            // depends on the order tasks are processed. Since all senders
1963            // start at once, they'll compete for bandwidth
1964            let sizes = [10_000, 20_000, 30_000];
1965            for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
1966                let rx_clone = receiver.clone();
1967                let msg_size = *size;
1968                let msg = IoBuf::from(vec![i as u8; msg_size]);
1969                tx.send(Recipients::One(rx_clone), msg, true);
1970            }
1971
1972            // Receive messages. They arrive in the order they were scheduled,
1973            // not necessarily size order. Collect all messages and sort by
1974            // receive time to verify timing
1975            let mut messages = Vec::new();
1976            for _ in 0..3 {
1977                let (_, msg) = receiver_rx.recv().await.unwrap();
1978                let t = context.current().duration_since(start).unwrap();
1979                messages.push((msg.as_ref()[0] as usize, msg.len(), t));
1980            }
1981
1982            // When all start at once, they'll reserve bandwidth slots
1983            // sequentially. First gets full 30KB/s, others wait or get
1984            // remaining bandwidth. Just verify all messages arrived and total
1985            // time is reasonable
1986            assert_eq!(messages.len(), 3);
1987
1988            // Total data is 60KB at 30KB/s receiver ingress = 2s minimum
1989            let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
1990            assert!(
1991                max_time >= Duration::from_millis(2000),
1992                "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
1993            );
1994        });
1995    }
1996
1997    #[test]
1998    fn test_bandwidth_pipe_reservation_duration() {
1999        // Test that bandwidth pipe is only reserved for transmission duration, not latency
2000        // This means new messages can start transmitting while others are still in flight
2001        let executor = deterministic::Runner::default();
2002        executor.start(|context| async move {
2003            let (network, oracle) = Network::new(
2004                context.child("network"),
2005                Config {
2006                    max_size: 1024 * 1024,
2007                    disconnect_on_block: true,
2008                    tracked_peer_sets: NZUsize!(1),
2009                },
2010            );
2011            network.start();
2012
2013            // Create two peers
2014            let sender = PrivateKey::from_seed(1).public_key();
2015            let receiver = PrivateKey::from_seed(2).public_key();
2016
2017            let (mut sender_tx, _) = oracle
2018                .control(sender.clone())
2019                .register(0, TEST_QUOTA)
2020                .await
2021                .unwrap();
2022            let (_, mut receiver_rx) = oracle
2023                .control(receiver.clone())
2024                .register(0, TEST_QUOTA)
2025                .await
2026                .unwrap();
2027            track_peers(&oracle, [sender.clone(), receiver.clone()]).await;
2028
2029            // Set bandwidth: 1000 B/s (1 byte per millisecond)
2030            oracle
2031                .limit_bandwidth(sender.clone(), Some(1000), None)
2032                .await
2033                .unwrap();
2034            oracle
2035                .limit_bandwidth(receiver.clone(), None, Some(1000))
2036                .await
2037                .unwrap();
2038
2039            // Add link with significant latency (1 second)
2040            oracle
2041                .add_link(
2042                    sender.clone(),
2043                    receiver.clone(),
2044                    Link {
2045                        latency: Duration::from_secs(1), // 1 second latency
2046                        jitter: Duration::ZERO,
2047                        success_rate: 1.0,
2048                    },
2049                )
2050                .await
2051                .unwrap();
2052
2053            // Send 3 messages of 500 bytes each
2054            // At 1000 B/s, each message takes 500ms to transmit
2055            // With 1s latency, if pipe was reserved for tx+latency, total would be:
2056            //   - Msg 1: 0-1500ms (500ms tx + 1000ms latency)
2057            //   - Msg 2: 1500-3000ms (starts after msg 1 fully delivered)
2058            //   - Msg 3: 3000-4500ms
2059            // But if pipe is only reserved during tx (correct behavior):
2060            //   - Msg 1: tx 0-500ms, delivered at 1500ms
2061            //   - Msg 2: tx 500-1000ms, delivered at 2000ms
2062            //   - Msg 3: tx 1000-1500ms, delivered at 2500ms
2063            let start = context.current();
2064
2065            // Send all messages in quick succession
2066            for i in 0..3 {
2067                let msg = IoBuf::from(vec![i; 500]);
2068                sender_tx.send(Recipients::One(receiver.clone()), msg, false);
2069            }
2070
2071            // Wait for all receives to complete and record their completion times
2072            let mut receive_times = Vec::new();
2073            for i in 0..3 {
2074                let (_, received) = receiver_rx.recv().await.unwrap();
2075                receive_times.push(context.current().duration_since(start).unwrap());
2076                assert_eq!(received.as_ref()[0], i);
2077            }
2078
2079            // Messages should be received at:
2080            // - Msg 1: ~1500ms (500ms transmission + 1000ms latency)
2081            // - Msg 2: ~2000ms (500ms wait + 500ms transmission + 1000ms latency)
2082            // - Msg 3: ~2500ms (1000ms wait + 500ms transmission + 1000ms latency)
2083            for (i, time) in receive_times.iter().enumerate() {
2084                let expected_min = (i as u64 * 500) + 1500;
2085                let expected_max = expected_min + 100;
2086
2087                assert!(
2088                    *time >= Duration::from_millis(expected_min)
2089                        && *time < Duration::from_millis(expected_max),
2090                    "Message {} should arrive at ~{}ms, got {:?}",
2091                    i + 1,
2092                    expected_min,
2093                    time
2094                );
2095            }
2096        });
2097    }
2098
2099    #[test]
2100    fn test_dynamic_bandwidth_affects_new_transfers() {
2101        // This test verifies that bandwidth changes affect NEW transfers,
2102        // not transfers already in progress (which have their reservations locked in)
2103        let executor = deterministic::Runner::default();
2104        executor.start(|context| async move {
2105            let (network, oracle) = Network::new(
2106                context.child("network"),
2107                Config {
2108                    max_size: 1024 * 1024,
2109                    disconnect_on_block: true,
2110                    tracked_peer_sets: NZUsize!(1),
2111                },
2112            );
2113            network.start();
2114
2115            let pk_sender = PrivateKey::from_seed(1).public_key();
2116            let pk_receiver = PrivateKey::from_seed(2).public_key();
2117
2118            // Register peers and establish link
2119            let (mut sender_tx, _) = oracle
2120                .control(pk_sender.clone())
2121                .register(0, TEST_QUOTA)
2122                .await
2123                .unwrap();
2124            let (_, mut receiver_rx) = oracle
2125                .control(pk_receiver.clone())
2126                .register(0, TEST_QUOTA)
2127                .await
2128                .unwrap();
2129            track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
2130            oracle
2131                .add_link(
2132                    pk_sender.clone(),
2133                    pk_receiver.clone(),
2134                    Link {
2135                        latency: Duration::from_millis(1), // Small latency
2136                        jitter: Duration::ZERO,
2137                        success_rate: 1.0,
2138                    },
2139                )
2140                .await
2141                .unwrap();
2142
2143            // Initial bandwidth: 10 KB/s
2144            oracle
2145                .limit_bandwidth(pk_sender.clone(), Some(10_000), None)
2146                .await
2147                .unwrap();
2148            oracle
2149                .limit_bandwidth(pk_receiver.clone(), None, Some(10_000))
2150                .await
2151                .unwrap();
2152
2153            // Send first message at 10 KB/s
2154            let msg1 = IoBuf::from(vec![1u8; 20_000]); // 20 KB
2155            let start_time = context.current();
2156            sender_tx.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false);
2157
2158            // Receive first message (should take ~2s at 10KB/s)
2159            let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
2160            let msg1_time = context.current().duration_since(start_time).unwrap();
2161            assert_eq!(received_msg1.len(), 20_000);
2162            assert!(
2163                msg1_time >= Duration::from_millis(1999)
2164                    && msg1_time <= Duration::from_millis(2010),
2165                "First message should take ~2s, got {msg1_time:?}",
2166            );
2167
2168            // Change bandwidth to 2 KB/s
2169            oracle
2170                .limit_bandwidth(pk_sender.clone(), Some(2_000), None)
2171                .await
2172                .unwrap();
2173
2174            // Send second message at new bandwidth
2175            let msg2 = IoBuf::from(vec![2u8; 10_000]); // 10 KB
2176            let msg2_start = context.current();
2177            sender_tx.send(Recipients::One(pk_receiver.clone()), msg2.clone(), false);
2178
2179            // Receive second message (should take ~5s at 2KB/s)
2180            let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
2181            let msg2_time = context.current().duration_since(msg2_start).unwrap();
2182            assert_eq!(received_msg2.len(), 10_000);
2183            assert!(
2184                msg2_time >= Duration::from_millis(4999)
2185                    && msg2_time <= Duration::from_millis(5010),
2186                "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
2187            );
2188        });
2189    }
2190
2191    #[test]
2192    fn test_zero_receiver_ingress_bandwidth() {
2193        let executor = deterministic::Runner::default();
2194        executor.start(|context| async move {
2195            let (network, oracle) = Network::new(
2196                context.child("network"),
2197                Config {
2198                    max_size: 1024 * 1024,
2199                    disconnect_on_block: true,
2200                    tracked_peer_sets: NZUsize!(1),
2201                },
2202            );
2203            network.start();
2204
2205            let pk_sender = PrivateKey::from_seed(1).public_key();
2206            let pk_receiver = PrivateKey::from_seed(2).public_key();
2207
2208            // Register peers and establish link
2209            let (mut sender_tx, _) = oracle
2210                .control(pk_sender.clone())
2211                .register(0, TEST_QUOTA)
2212                .await
2213                .unwrap();
2214            let (_, mut receiver_rx) = oracle
2215                .control(pk_receiver.clone())
2216                .register(0, TEST_QUOTA)
2217                .await
2218                .unwrap();
2219            track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
2220            oracle
2221                .add_link(
2222                    pk_sender.clone(),
2223                    pk_receiver.clone(),
2224                    Link {
2225                        latency: Duration::ZERO,
2226                        jitter: Duration::ZERO,
2227                        success_rate: 1.0,
2228                    },
2229                )
2230                .await
2231                .unwrap();
2232
2233            // Set sender bandwidth to 0
2234            oracle
2235                .limit_bandwidth(pk_receiver.clone(), None, Some(0))
2236                .await
2237                .unwrap();
2238
2239            // Send message to receiver
2240            let msg1 = IoBuf::from(vec![1u8; 20_000]); // 20 KB
2241            let sent = sender_tx.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false);
2242            assert_eq!(sent.len(), 1);
2243            assert_eq!(sent[0], pk_receiver);
2244
2245            // Message should not be received after 10 seconds
2246            select! {
2247                _ = receiver_rx.recv() => {
2248                    panic!("unexpected message");
2249                },
2250                _ = context.sleep(Duration::from_secs(10)) => {},
2251            }
2252
2253            // Unset bandwidth
2254            oracle
2255                .limit_bandwidth(pk_receiver.clone(), None, None)
2256                .await
2257                .unwrap();
2258
2259            // Message should be immediately received
2260            select! {
2261                _ = receiver_rx.recv() => {},
2262                _ = context.sleep(Duration::from_secs(1)) => {
2263                    panic!("timeout");
2264                },
2265            }
2266        });
2267    }
2268
2269    #[test]
2270    fn test_zero_sender_egress_bandwidth() {
2271        let executor = deterministic::Runner::default();
2272        executor.start(|context| async move {
2273            let (network, oracle) = Network::new(
2274                context.child("network"),
2275                Config {
2276                    max_size: 1024 * 1024,
2277                    disconnect_on_block: true,
2278                    tracked_peer_sets: NZUsize!(1),
2279                },
2280            );
2281            network.start();
2282
2283            let pk_sender = PrivateKey::from_seed(1).public_key();
2284            let pk_receiver = PrivateKey::from_seed(2).public_key();
2285
2286            // Register peers and establish link
2287            let (mut sender_tx, _) = oracle
2288                .control(pk_sender.clone())
2289                .register(0, TEST_QUOTA)
2290                .await
2291                .unwrap();
2292            let (_, mut receiver_rx) = oracle
2293                .control(pk_receiver.clone())
2294                .register(0, TEST_QUOTA)
2295                .await
2296                .unwrap();
2297            track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
2298            oracle
2299                .add_link(
2300                    pk_sender.clone(),
2301                    pk_receiver.clone(),
2302                    Link {
2303                        latency: Duration::ZERO,
2304                        jitter: Duration::ZERO,
2305                        success_rate: 1.0,
2306                    },
2307                )
2308                .await
2309                .unwrap();
2310
2311            // Set sender bandwidth to 0
2312            oracle
2313                .limit_bandwidth(pk_sender.clone(), Some(0), None)
2314                .await
2315                .unwrap();
2316
2317            // Send message to receiver
2318            let msg1 = IoBuf::from(vec![1u8; 20_000]); // 20 KB
2319            let sent = sender_tx.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false);
2320            assert_eq!(sent.len(), 1);
2321            assert_eq!(sent[0], pk_receiver);
2322
2323            // Message should not be received after 10 seconds
2324            select! {
2325                _ = receiver_rx.recv() => {
2326                    panic!("unexpected message");
2327                },
2328                _ = context.sleep(Duration::from_secs(10)) => {},
2329            }
2330
2331            // Unset bandwidth
2332            oracle
2333                .limit_bandwidth(pk_sender.clone(), None, None)
2334                .await
2335                .unwrap();
2336
2337            // Message should be immediately received
2338            select! {
2339                _ = receiver_rx.recv() => {},
2340                _ = context.sleep(Duration::from_secs(1)) => {
2341                    panic!("timeout");
2342                },
2343            }
2344        });
2345    }
2346
2347    #[test]
2348    fn register_peer_set() {
2349        let executor = deterministic::Runner::default();
2350        executor.start(|context| async move {
2351            let (network, oracle) = Network::new(
2352                context.child("network"),
2353                Config {
2354                    max_size: 1024 * 1024,
2355                    disconnect_on_block: true,
2356                    tracked_peer_sets: NZUsize!(3),
2357                },
2358            );
2359            network.start();
2360
2361            let mut manager = oracle.manager();
2362            assert_eq!(manager.peer_set(0).await, None);
2363
2364            let pk1 = PrivateKey::from_seed(1).public_key();
2365            let pk2 = PrivateKey::from_seed(2).public_key();
2366            manager.track(0xFF, Set::try_from([pk1.clone(), pk2.clone()]).unwrap());
2367
2368            assert_eq!(
2369                manager.peer_set(0xFF).await.unwrap(),
2370                TrackedPeers::primary(Set::try_from([pk1, pk2]).unwrap())
2371            );
2372        });
2373    }
2374
2375    #[test]
2376    fn test_socket_manager() {
2377        let executor = deterministic::Runner::default();
2378        executor.start(|context| async move {
2379            let (network, oracle) = Network::new(
2380                context.child("network"),
2381                Config {
2382                    max_size: 1024 * 1024,
2383                    disconnect_on_block: true,
2384                    tracked_peer_sets: NZUsize!(3),
2385                },
2386            );
2387            network.start();
2388
2389            let pk1 = PrivateKey::from_seed(1).public_key();
2390            let pk2 = PrivateKey::from_seed(2).public_key();
2391            let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2392            let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
2393
2394            let mut manager = oracle.socket_manager();
2395            manager.track(
2396                1,
2397                Map::<_, Address>::try_from([
2398                    (pk1.clone(), addr1.clone()),
2399                    (pk2.clone(), addr2.clone()),
2400                ])
2401                .unwrap(),
2402            );
2403
2404            let peer_set = manager.peer_set(1).await.expect("peer set missing");
2405            let keys: Vec<_> = Vec::from(peer_set.primary.clone());
2406            assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2407
2408            let mut subscription = manager.subscribe().await;
2409            let update = subscription.recv().await.unwrap();
2410            assert_eq!(update.index, 1);
2411            let latest_keys: Vec<_> = Vec::from(update.latest.primary.clone());
2412            assert_eq!(latest_keys, vec![pk1.clone(), pk2.clone()]);
2413            assert!(update.latest.secondary.is_empty());
2414            let all_primary_keys: Vec<_> = Vec::from(update.all.primary.clone());
2415            assert_eq!(all_primary_keys, vec![pk1.clone(), pk2.clone()]);
2416            assert!(update.all.secondary.is_empty());
2417
2418            manager.track(
2419                2,
2420                Map::<_, Address>::try_from([(pk2.clone(), addr2)]).unwrap(),
2421            );
2422
2423            let update = subscription.recv().await.unwrap();
2424            assert_eq!(update.index, 2);
2425            let latest_keys: Vec<_> = Vec::from(update.latest.primary);
2426            assert_eq!(latest_keys, vec![pk2.clone()]);
2427            assert!(update.latest.secondary.is_empty());
2428            let all_primary_keys: Vec<_> = Vec::from(update.all.primary);
2429            assert_eq!(all_primary_keys, vec![pk1, pk2]);
2430            assert!(update.all.secondary.is_empty());
2431        });
2432    }
2433
2434    #[test]
2435    fn test_manager_track_accepts_tracked_peers() {
2436        let executor = deterministic::Runner::default();
2437        executor.start(|context| async move {
2438            let (network, oracle) = Network::new(
2439                context.child("network"),
2440                Config {
2441                    max_size: 1024 * 1024,
2442                    disconnect_on_block: true,
2443                    tracked_peer_sets: NZUsize!(3),
2444                },
2445            );
2446            network.start();
2447
2448            let pk1 = PrivateKey::from_seed(1).public_key();
2449            let pk2 = PrivateKey::from_seed(2).public_key();
2450            let mut manager = oracle.manager();
2451
2452            manager.track(
2453                7,
2454                TrackedPeers::new(
2455                    Set::try_from([pk1.clone()]).unwrap(),
2456                    Set::try_from([pk2]).unwrap(),
2457                ),
2458            );
2459
2460            assert_eq!(
2461                manager.peer_set(7).await.unwrap(),
2462                TrackedPeers::new(
2463                    Set::try_from([pk1]).unwrap(),
2464                    Set::try_from([PrivateKey::from_seed(2).public_key()]).unwrap(),
2465                )
2466            );
2467        });
2468    }
2469
2470    #[test]
2471    fn test_manager_track_tracked_peers_overlap_primary_wins() {
2472        let executor = deterministic::Runner::default();
2473        executor.start(|context| async move {
2474            // pk2 is in both primary and secondary TrackedPeers; stored secondary is pk3 only.
2475            // latest and aggregate.secondary omit pk2; aggregate.primary still includes pk2.
2476            let (network, oracle) = Network::new(
2477                context.child("network"),
2478                Config {
2479                    max_size: 1024 * 1024,
2480                    disconnect_on_block: true,
2481                    tracked_peer_sets: NZUsize!(3),
2482                },
2483            );
2484            network.start();
2485
2486            let pk1 = PrivateKey::from_seed(1).public_key();
2487            let pk2 = PrivateKey::from_seed(2).public_key();
2488            let pk3 = PrivateKey::from_seed(3).public_key();
2489            let mut manager = oracle.manager();
2490
2491            manager.track(
2492                9,
2493                TrackedPeers::new(
2494                    Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
2495                    Set::try_from([pk2.clone(), pk3.clone()]).unwrap(),
2496                ),
2497            );
2498
2499            assert_eq!(
2500                manager.peer_set(9).await.unwrap(),
2501                TrackedPeers::new(
2502                    Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
2503                    Set::try_from([pk3.clone()]).unwrap(),
2504                )
2505            );
2506
2507            let mut subscription = manager.subscribe().await;
2508            let update = subscription.recv().await.unwrap();
2509            assert_eq!(update.index, 9);
2510            assert!(update.latest.primary.position(&pk2).is_some());
2511            assert!(update.latest.secondary.position(&pk2).is_none());
2512            assert!(update.latest.secondary.position(&pk3).is_some());
2513            assert!(update.all.secondary.position(&pk2).is_none());
2514            assert!(update.all.primary.position(&pk2).is_some());
2515        });
2516    }
2517
2518    #[test]
2519    fn test_socket_manager_track_accepts_addressable_tracked_peers() {
2520        let executor = deterministic::Runner::default();
2521        executor.start(|context| async move {
2522            let (network, oracle) = Network::new(
2523                context.child("network"),
2524                Config {
2525                    max_size: 1024 * 1024,
2526                    disconnect_on_block: true,
2527                    tracked_peer_sets: NZUsize!(3),
2528                },
2529            );
2530            network.start();
2531
2532            let pk1 = PrivateKey::from_seed(1).public_key();
2533            let pk2 = PrivateKey::from_seed(2).public_key();
2534            let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2535            let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
2536            let mut manager = oracle.socket_manager();
2537
2538            manager.track(
2539                7,
2540                AddressableTrackedPeers::new(
2541                    Map::<_, Address>::try_from([(pk1.clone(), addr1)]).unwrap(),
2542                    Map::<_, Address>::try_from([(pk2, addr2)]).unwrap(),
2543                ),
2544            );
2545
2546            assert_eq!(
2547                manager.peer_set(7).await.unwrap(),
2548                TrackedPeers::new(
2549                    Set::try_from([pk1]).unwrap(),
2550                    Set::try_from([PrivateKey::from_seed(2).public_key()]).unwrap(),
2551                )
2552            );
2553        });
2554    }
2555
2556    #[test]
2557    fn test_socket_manager_track_addressable_overlap_primary_wins() {
2558        let executor = deterministic::Runner::default();
2559        executor.start(|context| async move {
2560            // Same key in primary and secondary maps; primary address and role win (secondary ignored).
2561            let (network, oracle) = Network::new(
2562                context.child("network"),
2563                Config {
2564                    max_size: 1024 * 1024,
2565                    disconnect_on_block: true,
2566                    tracked_peer_sets: NZUsize!(3),
2567                },
2568            );
2569            network.start();
2570
2571            let pk = PrivateKey::from_seed(1).public_key();
2572            let addr_primary: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2573            let addr_secondary: Address = SocketAddr::from(([127, 0, 0, 1], 5000)).into();
2574            let mut manager = oracle.socket_manager();
2575            let mut subscription = manager.subscribe().await;
2576
2577            manager.track(
2578                11,
2579                AddressableTrackedPeers::new(
2580                    Map::<_, Address>::try_from([(pk.clone(), addr_primary.clone())]).unwrap(),
2581                    Map::<_, Address>::try_from([(pk.clone(), addr_secondary)]).unwrap(),
2582                ),
2583            );
2584
2585            let update = subscription.recv().await.unwrap();
2586            assert_eq!(update.index, 11);
2587            assert_eq!(update.latest.primary.len(), 1);
2588            assert!(update.latest.secondary.is_empty());
2589            assert!(update.all.secondary.is_empty());
2590            assert_eq!(update.latest.primary, Set::try_from([pk.clone()]).unwrap());
2591        });
2592    }
2593
2594    #[test]
2595    fn test_socket_manager_with_asymmetric_addresses() {
2596        let executor = deterministic::Runner::default();
2597        executor.start(|context| async move {
2598            let (network, oracle) = Network::new(
2599                context.child("network"),
2600                Config {
2601                    max_size: 1024 * 1024,
2602                    disconnect_on_block: true,
2603                    tracked_peer_sets: NZUsize!(3),
2604                },
2605            );
2606            network.start();
2607
2608            let pk1 = PrivateKey::from_seed(1).public_key();
2609            let pk2 = PrivateKey::from_seed(2).public_key();
2610
2611            // Use asymmetric addresses where ingress (dial) differs from egress (filter)
2612            let addr1 = Address::Asymmetric {
2613                ingress: Ingress::Socket(SocketAddr::from(([10, 0, 0, 1], 8080))),
2614                egress: SocketAddr::from(([192, 168, 1, 1], 9090)),
2615            };
2616            let addr2 = Address::Asymmetric {
2617                ingress: Ingress::Dns {
2618                    host: hostname!("node2.example.com"),
2619                    port: 8080,
2620                },
2621                egress: SocketAddr::from(([192, 168, 1, 2], 9090)),
2622            };
2623
2624            let mut manager = oracle.socket_manager();
2625            manager.track(
2626                1,
2627                Map::<_, Address>::try_from([(pk1.clone(), addr1), (pk2.clone(), addr2)]).unwrap(),
2628            );
2629
2630            // Verify peer set contains expected keys (addresses are ignored by simulated network)
2631            let peer_set = manager.peer_set(1).await.expect("peer set missing");
2632            let keys: Vec<_> = Vec::from(peer_set.primary);
2633            assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2634
2635            // Verify subscription works
2636            let mut subscription = manager.subscribe().await;
2637            let update = subscription.recv().await.unwrap();
2638            assert_eq!(update.index, 1);
2639            let latest_keys: Vec<_> = Vec::from(update.latest.primary);
2640            assert_eq!(latest_keys, vec![pk1, pk2]);
2641            assert!(update.latest.secondary.is_empty());
2642        });
2643    }
2644
2645    #[test]
2646    fn test_peer_set_window_management() {
2647        let executor = deterministic::Runner::default();
2648        executor.start(|context| async move {
2649            let (network, oracle) = Network::new(
2650                context.child("network"),
2651                Config {
2652                    max_size: 1024 * 1024,
2653                    disconnect_on_block: true,
2654                    tracked_peer_sets: NZUsize!(2), // Only track 2 peer sets
2655                },
2656            );
2657            network.start();
2658
2659            // Create 4 peers
2660            let pk1 = PrivateKey::from_seed(1).public_key();
2661            let pk2 = PrivateKey::from_seed(2).public_key();
2662            let pk3 = PrivateKey::from_seed(3).public_key();
2663            let pk4 = PrivateKey::from_seed(4).public_key();
2664
2665            // Register first peer set with pk1 and pk2
2666            let mut manager = oracle.manager();
2667            manager.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
2668
2669            // Register channels for all peers
2670            let (mut sender1, _receiver1) = oracle
2671                .control(pk1.clone())
2672                .register(0, TEST_QUOTA)
2673                .await
2674                .unwrap();
2675            let (mut sender2, _receiver2) = oracle
2676                .control(pk2.clone())
2677                .register(0, TEST_QUOTA)
2678                .await
2679                .unwrap();
2680            let (mut sender3, _receiver3) = oracle
2681                .control(pk3.clone())
2682                .register(0, TEST_QUOTA)
2683                .await
2684                .unwrap();
2685            let (_mut_sender4, _receiver4) = oracle
2686                .control(pk4.clone())
2687                .register(0, TEST_QUOTA)
2688                .await
2689                .unwrap();
2690
2691            // Create bidirectional links between all peers
2692            for peer_a in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2693                for peer_b in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2694                    if peer_a != peer_b {
2695                        oracle
2696                            .add_link(
2697                                peer_a.clone(),
2698                                peer_b.clone(),
2699                                Link {
2700                                    latency: Duration::from_millis(1),
2701                                    jitter: Duration::ZERO,
2702                                    success_rate: 1.0,
2703                                },
2704                            )
2705                            .await
2706                            .unwrap();
2707                    }
2708                }
2709            }
2710
2711            // pk1 can broadcast to pk2, but not pk3 while pk3 is untracked.
2712            let recipients = sender1.check(Recipients::All).unwrap().recipients();
2713            assert_eq!(recipients, vec![pk2.clone()]);
2714            assert!(!recipients.contains(&pk3));
2715
2716            // Register second peer set with pk2 and pk3
2717            manager.track(2, Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap());
2718            assert!(manager.peer_set(2).await.is_some());
2719
2720            // Now pk3 is in a peer set and pk1 can broadcast to it.
2721            let recipients = sender1.check(Recipients::All).unwrap().recipients();
2722            assert!(recipients.contains(&pk3));
2723
2724            // Register third peer set with pk3 and pk4 (this will evict peer set 1)
2725            manager.track(3, Set::try_from(vec![pk3.clone(), pk4.clone()]).unwrap());
2726            assert!(manager.peer_set(3).await.is_some());
2727
2728            // pk1 should now be removed from all peer sets.
2729            let recipients = sender2.check(Recipients::All).unwrap().recipients();
2730            assert!(!recipients.contains(&pk1));
2731
2732            // pk3 should still be reachable (in sets 2 and 3).
2733            assert!(recipients.contains(&pk3));
2734
2735            // pk4 should be reachable (in set 3).
2736            let recipients = sender3.check(Recipients::All).unwrap().recipients();
2737            assert!(recipients.contains(&pk4));
2738
2739            // Verify peer set contents
2740            let peer_set_2 = manager.peer_set(2).await.unwrap();
2741            assert!(peer_set_2.primary.position(&pk2).is_some());
2742            assert!(peer_set_2.primary.position(&pk3).is_some());
2743
2744            let peer_set_3 = manager.peer_set(3).await.unwrap();
2745            assert!(peer_set_3.primary.position(&pk3).is_some());
2746            assert!(peer_set_3.primary.position(&pk4).is_some());
2747
2748            // Peer set 1 should no longer exist
2749            assert!(manager.peer_set(1).await.is_none());
2750        });
2751    }
2752
2753    #[test]
2754    fn test_connected_subscription_updates_after_track() {
2755        let executor = deterministic::Runner::default();
2756        executor.start(|context| async move {
2757            let (network, oracle) = Network::new(
2758                context.child("network"),
2759                Config {
2760                    max_size: 1024 * 1024,
2761                    disconnect_on_block: true,
2762                    tracked_peer_sets: NZUsize!(2),
2763                },
2764            );
2765            network.start();
2766
2767            let pk1 = PrivateKey::from_seed(1).public_key();
2768            let pk2 = PrivateKey::from_seed(2).public_key();
2769            let (mut sender, _) = oracle
2770                .control(pk1.clone())
2771                .register(0, TEST_QUOTA)
2772                .await
2773                .unwrap();
2774
2775            assert!(sender
2776                .check(Recipients::All)
2777                .unwrap()
2778                .recipients()
2779                .is_empty());
2780
2781            let mut manager = oracle.manager();
2782            manager.track(1, Set::try_from([pk1, pk2.clone()]).unwrap());
2783            assert!(manager.peer_set(1).await.is_some());
2784
2785            assert_eq!(
2786                sender.check(Recipients::All).unwrap().recipients(),
2787                vec![pk2]
2788            );
2789        });
2790    }
2791
2792    #[test]
2793    fn test_sender_removed_from_peer_set_drops_message() {
2794        let executor = deterministic::Runner::default();
2795        executor.start(|context| async move {
2796            // Create a simulated network
2797            let (network, oracle) = Network::new(
2798                context.child("network"),
2799                Config {
2800                    max_size: 1024 * 1024,
2801                    disconnect_on_block: true,
2802                    tracked_peer_sets: NZUsize!(1),
2803                },
2804            );
2805            network.start();
2806            let mut manager = oracle.manager();
2807            let mut subscription = manager.subscribe().await;
2808
2809            // Register a peer set
2810            let sender_pk = PrivateKey::from_seed(1).public_key();
2811            let recipient_pk = PrivateKey::from_seed(2).public_key();
2812            manager.track(
2813                1,
2814                Set::try_from(vec![sender_pk.clone(), recipient_pk.clone()]).unwrap(),
2815            );
2816            let update = subscription.recv().await.unwrap();
2817            assert_eq!(update.index, 1);
2818
2819            // Register channels
2820            let (mut sender, _) = oracle
2821                .control(sender_pk.clone())
2822                .register(0, TEST_QUOTA)
2823                .await
2824                .unwrap();
2825            let (_sender2, mut receiver) = oracle
2826                .control(recipient_pk.clone())
2827                .register(0, TEST_QUOTA)
2828                .await
2829                .unwrap();
2830
2831            // Add link
2832            oracle
2833                .add_link(
2834                    sender_pk.clone(),
2835                    recipient_pk.clone(),
2836                    Link {
2837                        latency: Duration::from_millis(1),
2838                        jitter: Duration::ZERO,
2839                        success_rate: 1.0,
2840                    },
2841                )
2842                .await
2843                .unwrap();
2844
2845            // Send and confirm message
2846            let initial_msg = IoBuf::from(b"tracked");
2847            let sent = sender.send(
2848                Recipients::One(recipient_pk.clone()),
2849                initial_msg.clone(),
2850                false,
2851            );
2852            assert_eq!(sent.len(), 1);
2853            assert_eq!(sent[0], recipient_pk);
2854            let (_pk, received) = receiver.recv().await.unwrap();
2855            assert_eq!(received, initial_msg.clone());
2856
2857            // Register another peer set
2858            let other_pk = PrivateKey::from_seed(3).public_key();
2859            manager.track(
2860                2,
2861                Set::try_from(vec![recipient_pk.clone(), other_pk]).unwrap(),
2862            );
2863            let update = subscription.recv().await.unwrap();
2864            assert_eq!(update.index, 2);
2865
2866            // Explicit sends are accepted locally, but the network drops
2867            // messages from peers no longer in any peer set.
2868            let sent = sender.send(
2869                Recipients::One(recipient_pk.clone()),
2870                IoBuf::from(b"untracked"),
2871                false,
2872            );
2873            assert_eq!(sent, vec![recipient_pk.clone()]);
2874
2875            // Confirm message was not delivered
2876            select! {
2877                _ = receiver.recv() => {
2878                    panic!("unexpected message");
2879                },
2880                _ = context.sleep(Duration::from_secs(10)) => {},
2881            }
2882
2883            // Add a peer back to a peer set
2884            manager.track(
2885                3,
2886                Set::try_from(vec![sender_pk.clone(), recipient_pk.clone()]).unwrap(),
2887            );
2888            let update = subscription.recv().await.unwrap();
2889            assert_eq!(update.index, 3);
2890
2891            // Send message from a peer now back in a peer set
2892            let sent = sender.send(
2893                Recipients::One(recipient_pk.clone()),
2894                initial_msg.clone(),
2895                false,
2896            );
2897            assert_eq!(sent.len(), 1);
2898            assert_eq!(sent[0], recipient_pk);
2899            let (_pk, received) = receiver.recv().await.unwrap();
2900            assert_eq!(received, initial_msg);
2901        });
2902    }
2903
2904    #[test]
2905    fn test_subscribe_to_peer_sets() {
2906        let executor = deterministic::Runner::default();
2907        executor.start(|context| async move {
2908            let (network, oracle) = Network::new(
2909                context.child("network"),
2910                Config {
2911                    max_size: 1024 * 1024,
2912                    disconnect_on_block: true,
2913                    tracked_peer_sets: NZUsize!(2),
2914                },
2915            );
2916            network.start();
2917
2918            // Subscribe to peer set updates
2919            let mut manager = oracle.manager();
2920            let mut subscription = manager.subscribe().await;
2921
2922            // Create peers
2923            let pk1 = PrivateKey::from_seed(1).public_key();
2924            let pk2 = PrivateKey::from_seed(2).public_key();
2925            let pk3 = PrivateKey::from_seed(3).public_key();
2926
2927            // Register first peer set
2928            manager.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
2929
2930            // Verify we receive the notification
2931            let update = subscription.recv().await.unwrap();
2932            assert_eq!(update.index, 1);
2933            assert_eq!(
2934                update.latest.primary,
2935                Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap()
2936            );
2937            assert!(update.latest.secondary.is_empty());
2938            assert_eq!(
2939                update.all.primary,
2940                Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap()
2941            );
2942            assert!(update.all.secondary.is_empty());
2943
2944            // Register second peer set
2945            manager.track(2, Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap());
2946
2947            // Verify we receive the notification
2948            let update = subscription.recv().await.unwrap();
2949            assert_eq!(update.index, 2);
2950            assert_eq!(
2951                update.latest.primary,
2952                Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap()
2953            );
2954            assert!(update.latest.secondary.is_empty());
2955            assert_eq!(
2956                update.all.primary,
2957                vec![pk1.clone(), pk2.clone(), pk3.clone()]
2958                    .try_into()
2959                    .unwrap()
2960            );
2961            assert!(update.all.secondary.is_empty());
2962
2963            // Register third peer set
2964            manager.track(3, Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap());
2965
2966            // Verify we receive the notification
2967            let update = subscription.recv().await.unwrap();
2968            assert_eq!(update.index, 3);
2969            assert_eq!(
2970                update.latest.primary,
2971                Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
2972            );
2973            assert!(update.latest.secondary.is_empty());
2974            assert_eq!(
2975                update.all.primary,
2976                vec![pk1.clone(), pk2.clone(), pk3.clone()]
2977                    .try_into()
2978                    .unwrap()
2979            );
2980            assert!(update.all.secondary.is_empty());
2981
2982            // Register fourth peer set
2983            manager.track(4, Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap());
2984
2985            // Verify we receive the notification
2986            let update = subscription.recv().await.unwrap();
2987            assert_eq!(update.index, 4);
2988            assert_eq!(
2989                update.latest.primary,
2990                Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
2991            );
2992            assert!(update.latest.secondary.is_empty());
2993            assert_eq!(
2994                update.all.primary,
2995                Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
2996            );
2997            assert!(update.all.secondary.is_empty());
2998        });
2999    }
3000
3001    #[test]
3002    fn test_multiple_subscriptions() {
3003        let executor = deterministic::Runner::default();
3004        executor.start(|context| async move {
3005            let (network, oracle) = Network::new(
3006                context.child("network"),
3007                Config {
3008                    max_size: 1024 * 1024,
3009                    disconnect_on_block: true,
3010                    tracked_peer_sets: NZUsize!(3),
3011                },
3012            );
3013            network.start();
3014
3015            // Create multiple subscriptions
3016            let mut manager = oracle.manager();
3017            let mut subscription1 = manager.subscribe().await;
3018            let mut subscription2 = manager.subscribe().await;
3019            let mut subscription3 = manager.subscribe().await;
3020
3021            // Create peers
3022            let pk1 = PrivateKey::from_seed(1).public_key();
3023            let pk2 = PrivateKey::from_seed(2).public_key();
3024
3025            // Register a peer set
3026            manager.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
3027
3028            // Verify all subscriptions receive the notification
3029            let update1 = subscription1.recv().await.unwrap();
3030            let update2 = subscription2.recv().await.unwrap();
3031            let update3 = subscription3.recv().await.unwrap();
3032
3033            assert_eq!(update1.index, 1);
3034            assert_eq!(update2.index, 1);
3035            assert_eq!(update3.index, 1);
3036
3037            // Drop one subscription
3038            drop(subscription2);
3039
3040            // Register another peer set
3041            manager.track(2, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
3042
3043            // Verify remaining subscriptions still receive notifications
3044            let update1 = subscription1.recv().await.unwrap();
3045            let update3 = subscription3.recv().await.unwrap();
3046
3047            assert_eq!(update1.index, 2);
3048            assert_eq!(update3.index, 2);
3049        });
3050    }
3051
3052    #[test]
3053    fn test_subscription_includes_self_when_registered() {
3054        let executor = deterministic::Runner::default();
3055        executor.start(|context| async move {
3056            let (network, oracle) = Network::new(
3057                context.child("network"),
3058                Config {
3059                    max_size: 1024 * 1024,
3060                    disconnect_on_block: true,
3061                    tracked_peer_sets: NZUsize!(2),
3062                },
3063            );
3064            network.start();
3065
3066            // Create "self" and "other" peers
3067            let self_pk = PrivateKey::from_seed(0).public_key();
3068            let other_pk = PrivateKey::from_seed(1).public_key();
3069
3070            // Register a channel for self (this creates the peer in the network)
3071            let (_sender, _receiver) = oracle
3072                .control(self_pk.clone())
3073                .register(0, TEST_QUOTA)
3074                .await
3075                .unwrap();
3076
3077            // Subscribe to peer set updates
3078            let mut manager = oracle.manager();
3079            let mut subscription = manager.subscribe().await;
3080
3081            // Register a peer set that does NOT include self
3082            manager.track(1, Set::try_from(vec![other_pk.clone()]).unwrap());
3083
3084            // Receive subscription notification
3085            let update = subscription.recv().await.unwrap();
3086            assert_eq!(update.index, 1);
3087            assert_eq!(update.latest.primary.len(), 1);
3088            assert!(update.latest.secondary.is_empty());
3089            assert_eq!(update.all.primary.len(), 1);
3090            assert!(update.all.secondary.is_empty());
3091
3092            // Self should NOT be in the latest primary set
3093            assert!(
3094                update.latest.primary.position(&self_pk).is_none(),
3095                "latest primary set should not include self"
3096            );
3097            assert!(
3098                update.latest.primary.position(&other_pk).is_some(),
3099                "latest primary set should include other"
3100            );
3101
3102            // Self should NOT be in the peer set (not tracked)
3103            assert!(
3104                update.all.primary.position(&self_pk).is_none(),
3105                "peer set should not include self"
3106            );
3107            assert!(
3108                update.all.primary.position(&other_pk).is_some(),
3109                "peer set should include other"
3110            );
3111
3112            // Now register a peer set that DOES include self
3113            manager.track(
3114                2,
3115                Set::try_from(vec![self_pk.clone(), other_pk.clone()]).unwrap(),
3116            );
3117
3118            let update = subscription.recv().await.unwrap();
3119            assert_eq!(update.index, 2);
3120            assert_eq!(update.latest.primary.len(), 2);
3121            assert!(update.latest.secondary.is_empty());
3122            assert_eq!(update.all.primary.len(), 2);
3123            assert!(update.all.secondary.is_empty());
3124
3125            // Both peers should be in the latest primary set
3126            assert!(
3127                update.latest.primary.position(&self_pk).is_some(),
3128                "latest primary set should include self"
3129            );
3130            assert!(
3131                update.latest.primary.position(&other_pk).is_some(),
3132                "latest primary set should include other"
3133            );
3134
3135            // Both peers should be in the peer set
3136            assert!(
3137                update.all.primary.position(&self_pk).is_some(),
3138                "peer set should include self"
3139            );
3140            assert!(
3141                update.all.primary.position(&other_pk).is_some(),
3142                "peer set should include other"
3143            );
3144        });
3145    }
3146
3147    #[test]
3148    fn test_rate_limiting() {
3149        let executor = deterministic::Runner::default();
3150        executor.start(|context| async move {
3151            let cfg = Config {
3152                max_size: 1024 * 1024,
3153                disconnect_on_block: true,
3154                tracked_peer_sets: NZUsize!(3),
3155            };
3156            // Create two public keys
3157            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3158            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3159
3160            let (network, oracle) =
3161                Network::new_with_peers(context.child("network"), cfg, [pk1.clone(), pk2.clone()])
3162                    .await;
3163            network.start();
3164
3165            // Register with a very restrictive quota: 1 message per second
3166            let restrictive_quota = Quota::per_second(NZU32!(1));
3167            let control1 = oracle.control(pk1.clone());
3168            let (mut sender, _) = control1.register(0, restrictive_quota).await.unwrap();
3169            let control2 = oracle.control(pk2.clone());
3170            let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
3171
3172            // Add bidirectional links
3173            let link = ingress::Link {
3174                latency: Duration::from_millis(0),
3175                jitter: Duration::from_millis(0),
3176                success_rate: 1.0,
3177            };
3178            oracle
3179                .add_link(pk1.clone(), pk2.clone(), link.clone())
3180                .await
3181                .unwrap();
3182            oracle.add_link(pk2.clone(), pk1, link).await.unwrap();
3183
3184            // First message should succeed immediately
3185            let msg1 = IoBuf::from(b"message1");
3186            let result1 = sender.send(Recipients::One(pk2.clone()), msg1.clone(), false);
3187            assert_eq!(result1.len(), 1, "first message should be sent");
3188
3189            // Verify first message is received
3190            let (_, received1) = receiver.recv().await.unwrap();
3191            assert_eq!(received1, msg1);
3192
3193            // Second message should be rate-limited (quota is 1/sec, no time has passed)
3194            let msg2 = IoBuf::from(b"message2");
3195            let result2 = sender.send(Recipients::One(pk2.clone()), msg2.clone(), false);
3196            assert_eq!(
3197                result2.len(),
3198                0,
3199                "second message should be rate-limited (skipped)"
3200            );
3201
3202            // Advance time by 1 second to allow the rate limiter to reset
3203            context.sleep(Duration::from_secs(1)).await;
3204
3205            // Third message should succeed after waiting
3206            let msg3 = IoBuf::from(b"message3");
3207            let result3 = sender.send(Recipients::One(pk2.clone()), msg3.clone(), false);
3208            assert_eq!(result3.len(), 1, "third message should be sent after wait");
3209
3210            // Verify third message is received
3211            let (_, received3) = receiver.recv().await.unwrap();
3212            assert_eq!(received3, msg3);
3213        });
3214    }
3215
3216    #[test]
3217    fn test_operations_after_shutdown_do_not_panic() {
3218        let executor = deterministic::Runner::default();
3219        executor.start(|context| async move {
3220            let cfg = Config {
3221                max_size: 1024 * 1024,
3222                disconnect_on_block: true,
3223                tracked_peer_sets: NZUsize!(3),
3224            };
3225            // Create peers
3226            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3227            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3228
3229            let (network, oracle) =
3230                Network::new_with_peers(context.child("network"), cfg, [pk1.clone(), pk2.clone()])
3231                    .await;
3232            let handle = network.start();
3233            let mut manager = oracle.manager();
3234
3235            // Register channels
3236            let control1 = oracle.control(pk1.clone());
3237            let (mut sender, _receiver) = control1.register(0, TEST_QUOTA).await.unwrap();
3238
3239            // Add link
3240            let link = ingress::Link {
3241                latency: Duration::from_millis(10),
3242                jitter: Duration::from_millis(0),
3243                success_rate: 1.0,
3244            };
3245            oracle
3246                .add_link(pk1.clone(), pk2.clone(), link.clone())
3247                .await
3248                .unwrap();
3249            wait_for_task_count(&context, "network", |count| count > 0).await;
3250
3251            // Abort the network
3252            handle.abort();
3253            let _ = handle.await;
3254            wait_for_task_count(&context, "network", |count| count == 0).await;
3255
3256            // Sending messages should not panic and should return empty
3257            let msg = IoBuf::from(b"test");
3258            let result = sender.send(Recipients::One(pk2.clone()), msg, false);
3259            assert!(result.is_empty(), "send after shutdown should return empty");
3260
3261            // Manager operations should not panic
3262            manager.track(1, Set::try_from([pk1.clone()]).unwrap());
3263            let _ = manager.peer_set(0).await;
3264            let _ = manager.subscribe().await;
3265
3266            // Oracle operations should not panic
3267            let _ = oracle
3268                .add_link(pk1.clone(), pk2.clone(), link.clone())
3269                .await;
3270            let _ = oracle.remove_link(pk1.clone(), pk2.clone()).await;
3271            let _ = oracle.blocked().await;
3272
3273            // Control operations should not panic
3274            let _ = control1.register(1, TEST_QUOTA).await;
3275        });
3276    }
3277
3278    fn clean_shutdown(seed: u64) {
3279        let cfg = deterministic::Config::default()
3280            .with_seed(seed)
3281            .with_timeout(Some(Duration::from_secs(30)));
3282        let executor = deterministic::Runner::new(cfg);
3283        executor.start(|context| async move {
3284            let cfg = Config {
3285                max_size: 1024 * 1024,
3286                disconnect_on_block: true,
3287                tracked_peer_sets: NZUsize!(3),
3288            };
3289            // Create peers
3290            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3291            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3292
3293            let (network, oracle) =
3294                Network::new_with_peers(context.child("network"), cfg, [pk1.clone(), pk2.clone()])
3295                    .await;
3296            let handle = network.start();
3297
3298            // Register channels
3299            let control1 = oracle.control(pk1.clone());
3300            let control2 = oracle.control(pk2.clone());
3301            let (mut sender, _) = control1.register(0, TEST_QUOTA).await.unwrap();
3302            let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
3303
3304            // Add bidirectional links
3305            let link = ingress::Link {
3306                latency: Duration::from_millis(10),
3307                jitter: Duration::from_millis(0),
3308                success_rate: 1.0,
3309            };
3310            oracle
3311                .add_link(pk1.clone(), pk2.clone(), link.clone())
3312                .await
3313                .unwrap();
3314            oracle
3315                .add_link(pk2.clone(), pk1.clone(), link)
3316                .await
3317                .unwrap();
3318
3319            // Wait until the network has started at least one task.
3320            wait_for_task_count(&context, "network", |count| count > 0).await;
3321            let running_before = count_running_tasks(&context, "network");
3322            assert!(
3323                running_before > 0,
3324                "at least one network task should be running"
3325            );
3326
3327            // Send and receive a message to verify network is functional
3328            let msg = IoBuf::from(b"test_message");
3329            let result = sender.send(Recipients::One(pk2.clone()), msg.clone(), false);
3330            assert_eq!(result.len(), 1, "message should be sent");
3331
3332            let (_, received) = receiver.recv().await.unwrap();
3333            assert_eq!(received, msg, "message should be received");
3334
3335            // Abort the network
3336            handle.abort();
3337            let _ = handle.await;
3338
3339            // Wait until task shutdown is reflected in the metrics.
3340            wait_for_task_count(&context, "network", |count| count == 0).await;
3341            let running_after = count_running_tasks(&context, "network");
3342            assert_eq!(
3343                running_after, 0,
3344                "all network tasks should be stopped, but {running_after} still running"
3345            );
3346        });
3347    }
3348
3349    #[test]
3350    fn test_clean_shutdown() {
3351        for seed in 0..25 {
3352            clean_shutdown(seed);
3353        }
3354    }
3355
3356    #[test]
3357    fn test_socket_manager_overwrite() {
3358        let executor = deterministic::Runner::default();
3359        executor.start(|context| async move {
3360            // Create simulated network with peer set tracking
3361            let (network, oracle) = Network::new(
3362                context.child("network"),
3363                Config {
3364                    max_size: 1024 * 1024,
3365                    disconnect_on_block: true,
3366                    tracked_peer_sets: NZUsize!(3),
3367                },
3368            );
3369            network.start();
3370
3371            // Generate keys
3372            let pk1 = PrivateKey::from_seed(1).public_key();
3373            let pk2 = PrivateKey::from_seed(2).public_key();
3374            let _pk3 = PrivateKey::from_seed(3).public_key();
3375
3376            let mut socket_manager = oracle.socket_manager();
3377
3378            // Simulated network ignores addresses, so overwrite is a no-op
3379            let addr: Address = "127.0.0.1:8000".parse::<SocketAddr>().unwrap().into();
3380
3381            // Register a peer set
3382            socket_manager.track(
3383                0,
3384                Map::<PublicKey, Address>::try_from([
3385                    (
3386                        pk1.clone(),
3387                        "127.0.0.1:8001".parse::<SocketAddr>().unwrap().into(),
3388                    ),
3389                    (pk2, "127.0.0.1:8002".parse::<SocketAddr>().unwrap().into()),
3390                ])
3391                .unwrap(),
3392            );
3393
3394            // overwrite is a no-op for simulated network (addresses not used)
3395            socket_manager.overwrite([(pk1, addr)].try_into().unwrap());
3396        });
3397    }
3398
3399    #[test]
3400    fn test_subscribe_returns_current_peer_set() {
3401        let executor = deterministic::Runner::default();
3402        executor.start(|context| async move {
3403            let (network, oracle) = Network::new(
3404                context.child("network"),
3405                Config {
3406                    max_size: 1024 * 1024,
3407                    disconnect_on_block: true,
3408                    tracked_peer_sets: NZUsize!(3),
3409                },
3410            );
3411            network.start();
3412
3413            // Create peers and track them
3414            let pk1 = PrivateKey::from_seed(0).public_key();
3415            let pk2 = PrivateKey::from_seed(1).public_key();
3416            let peers = ordered::Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap();
3417
3418            let mut manager = oracle.manager();
3419            Manager::track(&mut manager, 0, peers.clone());
3420
3421            // Subscribe after tracking. The current peer set should be
3422            // available immediately on the subscription channel.
3423            let mut subscription = Provider::subscribe(&mut manager).await;
3424            let update = subscription
3425                .try_recv()
3426                .expect("current peer set should be available immediately after subscribe");
3427            assert_eq!(update.index, 0);
3428            assert_eq!(update.latest.primary, peers);
3429            assert!(update.latest.secondary.is_empty());
3430        });
3431    }
3432}