commonware_p2p/simulated/
mod.rs

1//! Simulate networking between peers with configurable link behavior (i.e. drops, latency, corruption, etc.).
2//!
3//! Both peer and link modification can be performed dynamically over the lifetime of the simulated network. This
4//! can be used to mimic transient network partitions, offline nodes (that later connect), and/or degrading link
5//! quality. Messages on a link are delivered in order, and optional per-peer bandwidth limits account for
6//! transmission delay and queueing.
7//!
8//! # Determinism
9//!
10//! `commonware-p2p::simulated` can be run deterministically when paired with `commonware-runtime::deterministic`.
11//! This makes it possible to reproduce an arbitrary order of delivered/dropped messages with a given seed.
12//!
13//! # Bandwidth Simulation
14//!
15//! The simulator provides a realistic model of bandwidth contention where network
16//! capacity is a shared, finite resource. Bandwidth is allocated via progressive
17//! filling to provide max-min fairness.
18//!
19//! _If no bandwidth constraints are provided (default behavior), progressive filling and bandwidth
20//! tracking are not performed (avoiding unnecessary overhead for minimal p2p testing common in CI)._
21//!
22//! ## Core Model
23//!
24//! Whenever a transfer starts or finishes, or a bandwidth limit is updated, we execute a scheduling tick:
25//!
26//! 1. **Collect Active Flows:** Gather every active transfer that still has
27//!    bytes to send. A flow is bound to one sender and to one receiver (if the message will be delivered).
28//! 2. **Compute Progressive Filling:** Run progressive filling to raise the rate of
29//!    every active flow in lock-step until some sender's egress or receiver's ingress
30//!    limit saturates (at which point the flow is frozen and the process repeats with what remains).
31//! 3. **Wait for the Next Event:** Using those rates, determine which flow will
32//!    finish first by computing how long it needs to transmit its remaining
33//!    bytes. Advance simulated time directly to that completion instant (advancing all other flows
34//!    by the bytes transferred over the interval).
35//! 4. **Deliver Message:** Remove the completed flow and pass the message to the receiver. Repeat from step 1
36//!    until all flows are processed.
37//!
38//! _Messages between the same pair of peers remain strictly ordered. When one
39//! message finishes, the next message on that link may begin sending at
40//! `arrival_time - new_latency` so that its first byte arrives immediately after
41//! the previous one is fully received._
42//!
43//! ## Latency vs. Transmission Delay
44//!
45//! The simulation correctly distinguishes between two key components of message delivery:
46//!
47//! - **Transmission Delay:** The time it takes to send all bytes of a message over
48//!   the link. This is determined by the message size and the available bandwidth
49//!   (e.g., a 10KB message on a 10KB/s link has a 1-second transmission delay).
50//! - **Network Latency:** The time it takes for a byte to travel from the sender
51//!   to the receiver, independent of bandwidth. This is configured via the `Link`
52//!   properties.
53//!
54//! The final delivery time of a message is the sum of when its transmission completes
55//! plus the simulated network latency. This model ensures that large messages correctly
56//! occupy the network link for longer periods, affecting other concurrent transfers,
57//! while still accounting for the physical travel time of the data.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_p2p::{Manager, simulated::{Config, Link, Network}};
63//! use commonware_cryptography::{ed25519, PrivateKey, Signer as _, PublicKey as _, };
64//! use commonware_runtime::{deterministic, Metrics, Quota, Runner, Spawner};
65//! use commonware_utils::NZU32;
66//! use std::time::Duration;
67//!
68//! // Generate peers
69//! let peers = vec![
70//!     ed25519::PrivateKey::from_seed(0).public_key(),
71//!     ed25519::PrivateKey::from_seed(1).public_key(),
72//!     ed25519::PrivateKey::from_seed(2).public_key(),
73//!     ed25519::PrivateKey::from_seed(3).public_key(),
74//! ];
75//!
76//! // Configure network
77//! let p2p_cfg = Config {
78//!     max_size: 1024 * 1024, // 1MB
79//!     disconnect_on_block: true,
80//!     tracked_peer_sets: Some(3),
81//! };
82//!
83//! // Rate limit quota (1000 messages per second per peer)
84//! let quota = Quota::per_second(NZU32!(1000));
85//!
86//! // Start context
87//! let executor = deterministic::Runner::seeded(0);
88//! executor.start(|context| async move {
89//!     // Initialize network
90//!     let (network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
91//!
92//!     // Start network
93//!     let network_handler = network.start();
94//!
95//!     // Register a peer set
96//!     let mut manager = oracle.manager();
97//!     manager.update(0, peers.clone().try_into().unwrap()).await;
98//!
99//!     let (sender1, receiver1) = oracle.control(peers[0].clone()).register(0, quota).await.unwrap();
100//!     let (sender2, receiver2) = oracle.control(peers[1].clone()).register(0, quota).await.unwrap();
101//!
102//!     // Set bandwidth limits
103//!     // peer[0]: 10KB/s egress, unlimited ingress
104//!     // peer[1]: unlimited egress, 5KB/s ingress
105//!     oracle.limit_bandwidth(peers[0].clone(), Some(10_000), None).await.unwrap();
106//!     oracle.limit_bandwidth(peers[1].clone(), None, Some(5_000)).await.unwrap();
107//!
108//!     // Link 2 peers
109//!     oracle.add_link(
110//!         peers[0].clone(),
111//!         peers[1].clone(),
112//!         Link {
113//!             latency: Duration::from_millis(5),
114//!             jitter: Duration::from_millis(2),
115//!             success_rate: 0.75,
116//!         },
117//!     ).await.unwrap();
118//!
119//!     // ... Use sender and receiver ...
120//!
121//!     // Update link
122//!     oracle.remove_link(
123//!         peers[0].clone(),
124//!         peers[1].clone(),
125//!     ).await.unwrap();
126//!     oracle.add_link(
127//!         peers[0].clone(),
128//!         peers[1].clone(),
129//!         Link {
130//!             latency: Duration::from_millis(100),
131//!             jitter: Duration::from_millis(25),
132//!             success_rate: 0.8,
133//!         },
134//!     ).await.unwrap();
135//!
136//!     // ... Use sender and receiver ...
137//!
138//!     // Shutdown network
139//!     network_handler.abort();
140//! });
141//! ```
142
143mod bandwidth;
144mod ingress;
145mod metrics;
146mod network;
147mod transmitter;
148
149use thiserror::Error;
150
151/// Errors that can occur when interacting with the network.
152#[derive(Debug, Error)]
153pub enum Error {
154    #[error("message too large: {0}")]
155    MessageTooLarge(usize),
156    #[error("network closed")]
157    NetworkClosed,
158    #[error("not valid to link self")]
159    LinkingSelf,
160    #[error("link already exists")]
161    LinkExists,
162    #[error("link missing")]
163    LinkMissing,
164    #[error("invalid success rate (must be in [0, 1]): {0}")]
165    InvalidSuccessRate(f64),
166    #[error("send_frame failed")]
167    SendFrameFailed,
168    #[error("recv_frame failed")]
169    RecvFrameFailed,
170    #[error("bind failed")]
171    BindFailed,
172    #[error("accept failed")]
173    AcceptFailed,
174    #[error("dial failed")]
175    DialFailed,
176    #[error("peer missing")]
177    PeerMissing,
178}
179
180pub use ingress::{Control, Link, Manager, Oracle, SocketManager};
181pub use network::{
182    Config, ConnectedPeerProvider, Network, Receiver, Sender, SplitForwarder, SplitOrigin,
183    SplitRouter, SplitSender, SplitTarget, UnlimitedSender,
184};
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use crate::{Address, Ingress, Manager, Receiver, Recipients, Sender};
190    use bytes::Bytes;
191    use commonware_cryptography::{
192        ed25519::{self, PrivateKey, PublicKey},
193        Signer as _,
194    };
195    use commonware_macros::select;
196    use commonware_runtime::{deterministic, Clock, Metrics, Quota, Runner, Spawner};
197    use commonware_utils::{hostname, ordered::Map, NZU32};
198    use futures::{channel::mpsc, SinkExt, StreamExt};
199    use rand::Rng;
200    use std::{
201        collections::{BTreeMap, HashMap, HashSet},
202        net::SocketAddr,
203        num::NonZeroU32,
204        time::Duration,
205    };
206
207    /// Default rate limit set high enough to not interfere with normal operation
208    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
209
210    fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
211        let executor = deterministic::Runner::seeded(seed);
212        executor.start(|context| async move {
213            // Create simulated network
214            let (network, mut oracle) = Network::new(
215                context.with_label("network"),
216                Config {
217                    max_size: 1024 * 1024,
218                    disconnect_on_block: true,
219                    tracked_peer_sets: None,
220                },
221            );
222
223            // Start network
224            network.start();
225
226            // Register agents
227            let mut agents = BTreeMap::new();
228            let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
229            for i in 0..size {
230                let pk = PrivateKey::from_seed(i as u64).public_key();
231                let (sender, mut receiver) = oracle
232                    .control(pk.clone())
233                    .register(0, TEST_QUOTA)
234                    .await
235                    .unwrap();
236                agents.insert(pk, sender);
237                let mut agent_sender = seen_sender.clone();
238                context
239                    .with_label("agent_receiver")
240                    .spawn(move |_| async move {
241                        for _ in 0..size {
242                            receiver.recv().await.unwrap();
243                        }
244                        agent_sender.send(i).await.unwrap();
245
246                        // Exiting early here tests the case where the recipient end of an agent is dropped
247                    });
248            }
249
250            // Randomly link agents
251            let only_inbound = PrivateKey::from_seed(0).public_key();
252            for agent in agents.keys() {
253                if agent == &only_inbound {
254                    // Test that we can gracefully handle missing links
255                    continue;
256                }
257                for other in agents.keys() {
258                    let result = oracle
259                        .add_link(
260                            agent.clone(),
261                            other.clone(),
262                            Link {
263                                latency: Duration::from_millis(5),
264                                jitter: Duration::from_millis(2),
265                                success_rate: 0.75,
266                            },
267                        )
268                        .await;
269                    if agent == other {
270                        assert!(matches!(result, Err(Error::LinkingSelf)));
271                    } else {
272                        assert!(result.is_ok());
273                    }
274                }
275            }
276
277            // Send messages
278            context
279                .with_label("agent_sender")
280                .spawn(|mut context| async move {
281                    // Sort agents for deterministic output
282                    let keys = agents.keys().collect::<Vec<_>>();
283
284                    // Send messages
285                    loop {
286                        let index = context.gen_range(0..keys.len());
287                        let sender = keys[index];
288                        let msg = format!("hello from {sender:?}");
289                        let msg = Bytes::from(msg);
290                        let mut message_sender = agents.get(sender).unwrap().clone();
291                        let sent = message_sender
292                            .send(Recipients::All, msg.clone(), false)
293                            .await
294                            .unwrap();
295                        if sender == &only_inbound {
296                            assert_eq!(sent.len(), 0);
297                        } else {
298                            assert_eq!(sent.len(), keys.len() - 1);
299                        }
300                    }
301                });
302
303            // Wait for all recipients
304            let mut results = Vec::new();
305            for _ in 0..size {
306                results.push(seen_receiver.next().await.unwrap());
307            }
308            (context.auditor().state(), results)
309        })
310    }
311
312    fn compare_outputs(seeds: u64, size: usize) {
313        // Collect outputs
314        let mut outputs = Vec::new();
315        for seed in 0..seeds {
316            outputs.push(simulate_messages(seed, size));
317        }
318
319        // Confirm outputs are deterministic
320        for seed in 0..seeds {
321            let output = simulate_messages(seed, size);
322            assert_eq!(output, outputs[seed as usize]);
323        }
324    }
325
326    #[test]
327    fn test_determinism() {
328        compare_outputs(25, 25);
329    }
330
331    #[test]
332    fn test_message_too_big() {
333        let executor = deterministic::Runner::default();
334        executor.start(|mut context| async move {
335            // Create simulated network
336            let (network, oracle) = Network::new(
337                context.with_label("network"),
338                Config {
339                    max_size: 1024 * 1024,
340                    disconnect_on_block: true,
341                    tracked_peer_sets: None,
342                },
343            );
344
345            // Start network
346            network.start();
347
348            // Register agents
349            let mut agents = HashMap::new();
350            for i in 0..10 {
351                let pk = PrivateKey::from_seed(i as u64).public_key();
352                let (sender, _) = oracle
353                    .control(pk.clone())
354                    .register(0, TEST_QUOTA)
355                    .await
356                    .unwrap();
357                agents.insert(pk, sender);
358            }
359
360            // Send invalid message
361            let keys = agents.keys().collect::<Vec<_>>();
362            let index = context.gen_range(0..keys.len());
363            let sender = keys[index];
364            let mut message_sender = agents.get(sender).unwrap().clone();
365            let mut msg = vec![0u8; 1024 * 1024 + 1];
366            context.fill(&mut msg[..]);
367            let result = message_sender
368                .send(Recipients::All, msg.into(), false)
369                .await
370                .unwrap_err();
371
372            // Confirm error is correct
373            assert!(matches!(result, Error::MessageTooLarge(_)));
374        });
375    }
376
377    #[test]
378    fn test_linking_self() {
379        let executor = deterministic::Runner::default();
380        executor.start(|context| async move {
381            // Create simulated network
382            let (network, mut oracle) = Network::new(
383                context.with_label("network"),
384                Config {
385                    max_size: 1024 * 1024,
386                    disconnect_on_block: true,
387                    tracked_peer_sets: None,
388                },
389            );
390
391            // Start network
392            network.start();
393
394            // Register agents
395            let pk = PrivateKey::from_seed(0).public_key();
396            oracle
397                .control(pk.clone())
398                .register(0, TEST_QUOTA)
399                .await
400                .unwrap();
401
402            // Attempt to link self
403            let result = oracle
404                .add_link(
405                    pk.clone(),
406                    pk,
407                    Link {
408                        latency: Duration::from_millis(5),
409                        jitter: Duration::from_millis(2),
410                        success_rate: 0.75,
411                    },
412                )
413                .await;
414
415            // Confirm error is correct
416            assert!(matches!(result, Err(Error::LinkingSelf)));
417        });
418    }
419
420    #[test]
421    fn test_duplicate_channel() {
422        let executor = deterministic::Runner::default();
423        executor.start(|context| async move {
424            // Create simulated network
425            let (network, mut oracle) = Network::new(
426                context.with_label("network"),
427                Config {
428                    max_size: 1024 * 1024,
429                    disconnect_on_block: true,
430                    tracked_peer_sets: None,
431                },
432            );
433
434            // Start network
435            network.start();
436
437            // Setup links
438            let my_pk = PrivateKey::from_seed(0).public_key();
439            let other_pk = PrivateKey::from_seed(1).public_key();
440            oracle
441                .add_link(
442                    my_pk.clone(),
443                    other_pk.clone(),
444                    Link {
445                        latency: Duration::from_millis(10),
446                        jitter: Duration::from_millis(1),
447                        success_rate: 1.0,
448                    },
449                )
450                .await
451                .unwrap();
452            oracle
453                .add_link(
454                    other_pk.clone(),
455                    my_pk.clone(),
456                    Link {
457                        latency: Duration::from_millis(10),
458                        jitter: Duration::from_millis(1),
459                        success_rate: 1.0,
460                    },
461                )
462                .await
463                .unwrap();
464
465            // Register channels
466            let (mut my_sender, mut my_receiver) = oracle
467                .control(my_pk.clone())
468                .register(0, TEST_QUOTA)
469                .await
470                .unwrap();
471            let (mut other_sender, mut other_receiver) = oracle
472                .control(other_pk.clone())
473                .register(0, TEST_QUOTA)
474                .await
475                .unwrap();
476
477            // Send messages
478            let msg = Bytes::from("hello");
479            my_sender
480                .send(Recipients::One(other_pk.clone()), msg.clone(), false)
481                .await
482                .unwrap();
483            let (from, message) = other_receiver.recv().await.unwrap();
484            assert_eq!(from, my_pk);
485            assert_eq!(message, msg);
486            other_sender
487                .send(Recipients::One(my_pk.clone()), msg.clone(), false)
488                .await
489                .unwrap();
490            let (from, message) = my_receiver.recv().await.unwrap();
491            assert_eq!(from, other_pk);
492            assert_eq!(message, msg);
493
494            // Update channel
495            let (mut my_sender_2, mut my_receiver_2) = oracle
496                .control(my_pk.clone())
497                .register(0, TEST_QUOTA)
498                .await
499                .unwrap();
500
501            // Send message
502            let msg = Bytes::from("hello again");
503            my_sender_2
504                .send(Recipients::One(other_pk.clone()), msg.clone(), false)
505                .await
506                .unwrap();
507            let (from, message) = other_receiver.recv().await.unwrap();
508            assert_eq!(from, my_pk);
509            assert_eq!(message, msg);
510            other_sender
511                .send(Recipients::One(my_pk.clone()), msg.clone(), false)
512                .await
513                .unwrap();
514            let (from, message) = my_receiver_2.recv().await.unwrap();
515            assert_eq!(from, other_pk);
516            assert_eq!(message, msg);
517
518            // Listen on original
519            assert!(matches!(
520                my_receiver.recv().await,
521                Err(Error::NetworkClosed)
522            ));
523
524            // Send on original
525            assert!(matches!(
526                my_sender
527                    .send(Recipients::One(other_pk.clone()), msg.clone(), false)
528                    .await,
529                Err(Error::NetworkClosed)
530            ));
531        });
532    }
533
534    #[test]
535    fn test_invalid_success_rate() {
536        let executor = deterministic::Runner::default();
537        executor.start(|context| async move {
538            // Create simulated network
539            let (network, mut oracle) = Network::new(
540                context.with_label("network"),
541                Config {
542                    max_size: 1024 * 1024,
543                    disconnect_on_block: true,
544                    tracked_peer_sets: None,
545                },
546            );
547
548            // Start network
549            network.start();
550
551            // Register agents
552            let pk1 = PrivateKey::from_seed(0).public_key();
553            let pk2 = PrivateKey::from_seed(1).public_key();
554            oracle
555                .control(pk1.clone())
556                .register(0, TEST_QUOTA)
557                .await
558                .unwrap();
559            oracle
560                .control(pk2.clone())
561                .register(0, TEST_QUOTA)
562                .await
563                .unwrap();
564
565            // Attempt to link with invalid success rate
566            let result = oracle
567                .add_link(
568                    pk1,
569                    pk2,
570                    Link {
571                        latency: Duration::from_millis(5),
572                        jitter: Duration::from_millis(2),
573                        success_rate: 1.5,
574                    },
575                )
576                .await;
577
578            // Confirm error is correct
579            assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
580        });
581    }
582
583    #[test]
584    fn test_add_link_before_channel_registration() {
585        let executor = deterministic::Runner::default();
586        executor.start(|context| async move {
587            // Create simulated network
588            let (network, mut oracle) = Network::new(
589                context.with_label("network"),
590                Config {
591                    max_size: 1024 * 1024,
592                    disconnect_on_block: true,
593                    tracked_peer_sets: Some(3),
594                },
595            );
596            network.start();
597
598            // Create peers
599            let pk1 = PrivateKey::from_seed(0).public_key();
600            let pk2 = PrivateKey::from_seed(1).public_key();
601
602            // Register peer set
603            let mut manager = oracle.manager();
604            manager
605                .update(0, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
606                .await;
607
608            // Add link
609            oracle
610                .add_link(
611                    pk1.clone(),
612                    pk2.clone(),
613                    Link {
614                        latency: Duration::ZERO,
615                        jitter: Duration::ZERO,
616                        success_rate: 1.0,
617                    },
618                )
619                .await
620                .unwrap();
621
622            // Register channels
623            let (mut sender1, _receiver1) = oracle
624                .control(pk1.clone())
625                .register(0, TEST_QUOTA)
626                .await
627                .unwrap();
628            let (_, mut receiver2) = oracle
629                .control(pk2.clone())
630                .register(0, TEST_QUOTA)
631                .await
632                .unwrap();
633
634            // Send message
635            let msg1 = Bytes::from("link-before-register-1");
636            sender1
637                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
638                .await
639                .unwrap();
640            let (from, received) = receiver2.recv().await.unwrap();
641            assert_eq!(from, pk1);
642            assert_eq!(received, msg1);
643        });
644    }
645
646    #[test]
647    fn test_simple_message_delivery() {
648        let executor = deterministic::Runner::default();
649        executor.start(|context| async move {
650            // Create simulated network
651            let (network, mut oracle) = Network::new(
652                context.with_label("network"),
653                Config {
654                    max_size: 1024 * 1024,
655                    disconnect_on_block: true,
656                    tracked_peer_sets: None,
657                },
658            );
659
660            // Start network
661            network.start();
662
663            // Register agents
664            let pk1 = PrivateKey::from_seed(0).public_key();
665            let pk2 = PrivateKey::from_seed(1).public_key();
666            let (mut sender1, mut receiver1) = oracle
667                .control(pk1.clone())
668                .register(0, TEST_QUOTA)
669                .await
670                .unwrap();
671            let (mut sender2, mut receiver2) = oracle
672                .control(pk2.clone())
673                .register(0, TEST_QUOTA)
674                .await
675                .unwrap();
676
677            // Register unused channels
678            let _ = oracle
679                .control(pk1.clone())
680                .register(1, TEST_QUOTA)
681                .await
682                .unwrap();
683            let _ = oracle
684                .control(pk2.clone())
685                .register(2, TEST_QUOTA)
686                .await
687                .unwrap();
688
689            // Link agents
690            oracle
691                .add_link(
692                    pk1.clone(),
693                    pk2.clone(),
694                    Link {
695                        latency: Duration::from_millis(5),
696                        jitter: Duration::from_millis(2),
697                        success_rate: 1.0,
698                    },
699                )
700                .await
701                .unwrap();
702            oracle
703                .add_link(
704                    pk2.clone(),
705                    pk1.clone(),
706                    Link {
707                        latency: Duration::from_millis(5),
708                        jitter: Duration::from_millis(2),
709                        success_rate: 1.0,
710                    },
711                )
712                .await
713                .unwrap();
714
715            // Send messages
716            let msg1 = Bytes::from("hello from pk1");
717            let msg2 = Bytes::from("hello from pk2");
718            sender1
719                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
720                .await
721                .unwrap();
722            sender2
723                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
724                .await
725                .unwrap();
726
727            // Confirm message delivery
728            let (sender, message) = receiver1.recv().await.unwrap();
729            assert_eq!(sender, pk2);
730            assert_eq!(message, msg2);
731            let (sender, message) = receiver2.recv().await.unwrap();
732            assert_eq!(sender, pk1);
733            assert_eq!(message, msg1);
734        });
735    }
736
737    #[test]
738    fn test_send_wrong_channel() {
739        let executor = deterministic::Runner::default();
740        executor.start(|context| async move {
741            // Create simulated network
742            let (network, mut oracle) = Network::new(
743                context.with_label("network"),
744                Config {
745                    max_size: 1024 * 1024,
746                    disconnect_on_block: true,
747                    tracked_peer_sets: None,
748                },
749            );
750
751            // Start network
752            network.start();
753
754            // Register agents
755            let pk1 = PrivateKey::from_seed(0).public_key();
756            let pk2 = PrivateKey::from_seed(1).public_key();
757            let (mut sender1, _) = oracle
758                .control(pk1.clone())
759                .register(0, TEST_QUOTA)
760                .await
761                .unwrap();
762            let (_, mut receiver2) = oracle
763                .control(pk2.clone())
764                .register(1, TEST_QUOTA)
765                .await
766                .unwrap();
767
768            // Link agents
769            oracle
770                .add_link(
771                    pk1,
772                    pk2.clone(),
773                    Link {
774                        latency: Duration::from_millis(5),
775                        jitter: Duration::ZERO,
776                        success_rate: 1.0,
777                    },
778                )
779                .await
780                .unwrap();
781
782            // Send message
783            let msg = Bytes::from("hello from pk1");
784            sender1
785                .send(Recipients::One(pk2), msg, false)
786                .await
787                .unwrap();
788
789            // Confirm no message delivery
790            select! {
791                _ = receiver2.recv() => {
792                    panic!("unexpected message");
793                },
794                _ = context.sleep(Duration::from_secs(1)) => {},
795            }
796        });
797    }
798
799    #[test]
800    fn test_dynamic_peers() {
801        let executor = deterministic::Runner::default();
802        executor.start(|context| async move {
803            // Create simulated network
804            let (network, mut oracle) = Network::new(
805                context.with_label("network"),
806                Config {
807                    max_size: 1024 * 1024,
808                    disconnect_on_block: true,
809                    tracked_peer_sets: None,
810                },
811            );
812
813            // Start network
814            network.start();
815
816            // Define agents
817            let pk1 = PrivateKey::from_seed(0).public_key();
818            let pk2 = PrivateKey::from_seed(1).public_key();
819            let (mut sender1, mut receiver1) = oracle
820                .control(pk1.clone())
821                .register(0, TEST_QUOTA)
822                .await
823                .unwrap();
824            let (mut sender2, mut receiver2) = oracle
825                .control(pk2.clone())
826                .register(0, TEST_QUOTA)
827                .await
828                .unwrap();
829
830            // Link agents
831            oracle
832                .add_link(
833                    pk1.clone(),
834                    pk2.clone(),
835                    Link {
836                        latency: Duration::from_millis(5),
837                        jitter: Duration::from_millis(2),
838                        success_rate: 1.0,
839                    },
840                )
841                .await
842                .unwrap();
843            oracle
844                .add_link(
845                    pk2.clone(),
846                    pk1.clone(),
847                    Link {
848                        latency: Duration::from_millis(5),
849                        jitter: Duration::from_millis(2),
850                        success_rate: 1.0,
851                    },
852                )
853                .await
854                .unwrap();
855
856            // Send messages
857            let msg1 = Bytes::from("attempt 1: hello from pk1");
858            let msg2 = Bytes::from("attempt 1: hello from pk2");
859            sender1
860                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
861                .await
862                .unwrap();
863            sender2
864                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
865                .await
866                .unwrap();
867
868            // Confirm message delivery
869            let (sender, message) = receiver1.recv().await.unwrap();
870            assert_eq!(sender, pk2);
871            assert_eq!(message, msg2);
872            let (sender, message) = receiver2.recv().await.unwrap();
873            assert_eq!(sender, pk1);
874            assert_eq!(message, msg1);
875        });
876    }
877
878    #[test]
879    fn test_dynamic_links() {
880        let executor = deterministic::Runner::default();
881        executor.start(|context| async move {
882            // Create simulated network
883            let (network, mut oracle) = Network::new(
884                context.with_label("network"),
885                Config {
886                    max_size: 1024 * 1024,
887                    disconnect_on_block: true,
888                    tracked_peer_sets: None,
889                },
890            );
891
892            // Start network
893            network.start();
894
895            // Register agents
896            let pk1 = PrivateKey::from_seed(0).public_key();
897            let pk2 = PrivateKey::from_seed(1).public_key();
898            let (mut sender1, mut receiver1) = oracle
899                .control(pk1.clone())
900                .register(0, TEST_QUOTA)
901                .await
902                .unwrap();
903            let (mut sender2, mut receiver2) = oracle
904                .control(pk2.clone())
905                .register(0, TEST_QUOTA)
906                .await
907                .unwrap();
908
909            // Send messages
910            let msg1 = Bytes::from("attempt 1: hello from pk1");
911            let msg2 = Bytes::from("attempt 1: hello from pk2");
912            sender1
913                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
914                .await
915                .unwrap();
916            sender2
917                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
918                .await
919                .unwrap();
920
921            // Confirm no message delivery
922            select! {
923                _ = receiver1.recv() => {
924                    panic!("unexpected message");
925                },
926                _ = receiver2.recv() => {
927                    panic!("unexpected message");
928                },
929                _ = context.sleep(Duration::from_secs(1)) => {},
930            }
931
932            // Link agents
933            oracle
934                .add_link(
935                    pk1.clone(),
936                    pk2.clone(),
937                    Link {
938                        latency: Duration::from_millis(5),
939                        jitter: Duration::from_millis(2),
940                        success_rate: 1.0,
941                    },
942                )
943                .await
944                .unwrap();
945            oracle
946                .add_link(
947                    pk2.clone(),
948                    pk1.clone(),
949                    Link {
950                        latency: Duration::from_millis(5),
951                        jitter: Duration::from_millis(2),
952                        success_rate: 1.0,
953                    },
954                )
955                .await
956                .unwrap();
957
958            // Send messages
959            let msg1 = Bytes::from("attempt 2: hello from pk1");
960            let msg2 = Bytes::from("attempt 2: hello from pk2");
961            sender1
962                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
963                .await
964                .unwrap();
965            sender2
966                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
967                .await
968                .unwrap();
969
970            // Confirm message delivery
971            let (sender, message) = receiver1.recv().await.unwrap();
972            assert_eq!(sender, pk2);
973            assert_eq!(message, msg2);
974            let (sender, message) = receiver2.recv().await.unwrap();
975            assert_eq!(sender, pk1);
976            assert_eq!(message, msg1);
977
978            // Remove links
979            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
980            oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
981
982            // Send messages
983            let msg1 = Bytes::from("attempt 3: hello from pk1");
984            let msg2 = Bytes::from("attempt 3: hello from pk2");
985            sender1
986                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
987                .await
988                .unwrap();
989            sender2
990                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
991                .await
992                .unwrap();
993
994            // Confirm no message delivery
995            select! {
996                _ = receiver1.recv() => {
997                    panic!("unexpected message");
998                },
999                _ = receiver2.recv() => {
1000                    panic!("unexpected message");
1001                },
1002                _ = context.sleep(Duration::from_secs(1)) => {},
1003            }
1004
1005            // Remove non-existent links
1006            let result = oracle.remove_link(pk1, pk2).await;
1007            assert!(matches!(result, Err(Error::LinkMissing)));
1008        });
1009    }
1010
1011    async fn test_bandwidth_between_peers(
1012        context: &mut deterministic::Context,
1013        oracle: &mut Oracle<PublicKey, deterministic::Context>,
1014        sender_bps: Option<usize>,
1015        receiver_bps: Option<usize>,
1016        message_size: usize,
1017        expected_duration_ms: u64,
1018    ) {
1019        // Create two agents
1020        let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
1021        let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
1022        let (mut sender, _) = oracle
1023            .control(pk1.clone())
1024            .register(0, TEST_QUOTA)
1025            .await
1026            .unwrap();
1027        let (_, mut receiver) = oracle
1028            .control(pk2.clone())
1029            .register(0, TEST_QUOTA)
1030            .await
1031            .unwrap();
1032
1033        // Set bandwidth limits
1034        oracle
1035            .limit_bandwidth(pk1.clone(), sender_bps, None)
1036            .await
1037            .unwrap();
1038        oracle
1039            .limit_bandwidth(pk2.clone(), None, receiver_bps)
1040            .await
1041            .unwrap();
1042
1043        // Link the two agents
1044        oracle
1045            .add_link(
1046                pk1.clone(),
1047                pk2.clone(),
1048                Link {
1049                    // No latency so it doesn't interfere with bandwidth delay calculation
1050                    latency: Duration::ZERO,
1051                    jitter: Duration::ZERO,
1052                    success_rate: 1.0,
1053                },
1054            )
1055            .await
1056            .unwrap();
1057
1058        // Send a message from agent 1 to 2
1059        let msg = Bytes::from(vec![42u8; message_size]);
1060        let start = context.current();
1061        sender
1062            .send(Recipients::One(pk2.clone()), msg.clone(), true)
1063            .await
1064            .unwrap();
1065
1066        // Measure how long it takes for agent 2 to receive the message
1067        let (origin, received) = receiver.recv().await.unwrap();
1068        let elapsed = context.current().duration_since(start).unwrap();
1069
1070        assert_eq!(origin, pk1);
1071        assert_eq!(received, msg);
1072        assert!(
1073            elapsed >= Duration::from_millis(expected_duration_ms),
1074            "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
1075        );
1076        assert!(
1077            elapsed < Duration::from_millis(expected_duration_ms + 100),
1078            "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
1079        );
1080    }
1081
1082    #[test]
1083    fn test_bandwidth() {
1084        let executor = deterministic::Runner::default();
1085        executor.start(|mut context| async move {
1086            let (network, mut oracle) = Network::new(
1087                context.with_label("network"),
1088                Config {
1089                    max_size: 1024 * 1024,
1090                    disconnect_on_block: true,
1091                    tracked_peer_sets: None,
1092                },
1093            );
1094            network.start();
1095
1096            // Both sender and receiver have the same bandiwdth (1000 B/s)
1097            // 500 bytes at 1000 B/s = 0.5 seconds
1098            test_bandwidth_between_peers(
1099                &mut context,
1100                &mut oracle,
1101                Some(1000), // sender egress
1102                Some(1000), // receiver ingress
1103                500,        // message size
1104                500,        // expected duration in ms
1105            )
1106            .await;
1107
1108            // Sender has lower bandwidth (500 B/s) than receiver (2000 B/s)
1109            // Should be limited by sender's 500 B/s
1110            // 250 bytes at 500 B/s = 0.5 seconds
1111            test_bandwidth_between_peers(
1112                &mut context,
1113                &mut oracle,
1114                Some(500),  // sender egress
1115                Some(2000), // receiver ingress
1116                250,        // message size
1117                500,        // expected duration in ms
1118            )
1119            .await;
1120
1121            // Sender has higher bandwidth (2000 B/s) than receiver (500 B/s)
1122            // Should be limited by receiver's 500 B/s
1123            // 250 bytes at 500 B/s = 0.5 seconds
1124            test_bandwidth_between_peers(
1125                &mut context,
1126                &mut oracle,
1127                Some(2000), // sender egress
1128                Some(500),  // receiver ingress
1129                250,        // message size
1130                500,        // expected duration in ms
1131            )
1132            .await;
1133
1134            // Unlimited sender, limited receiver
1135            // Should be limited by receiver's 1000 B/s
1136            // 500 bytes at 1000 B/s = 0.5 seconds
1137            test_bandwidth_between_peers(
1138                &mut context,
1139                &mut oracle,
1140                None,       // sender egress (unlimited)
1141                Some(1000), // receiver ingress
1142                500,        // message size
1143                500,        // expected duration in ms
1144            )
1145            .await;
1146
1147            // Limited sender, unlimited receiver
1148            // Should be limited by sender's 1000 B/s
1149            // 500 bytes at 1000 B/s = 0.5 seconds
1150            test_bandwidth_between_peers(
1151                &mut context,
1152                &mut oracle,
1153                Some(1000), // sender egress
1154                None,       // receiver ingress (unlimited)
1155                500,        // message size
1156                500,        // expected duration in ms
1157            )
1158            .await;
1159
1160            // Unlimited sender, unlimited receiver
1161            // Delivery should be (almost) instant
1162            test_bandwidth_between_peers(
1163                &mut context,
1164                &mut oracle,
1165                None, // sender egress (unlimited)
1166                None, // receiver ingress (unlimited)
1167                500,  // message size
1168                0,    // expected duration in ms
1169            )
1170            .await;
1171        });
1172    }
1173
1174    #[test]
1175    fn test_bandwidth_contention() {
1176        // Test bandwidth contention with many peers (one-to-many and many-to-one scenarios)
1177        let executor = deterministic::Runner::default();
1178        executor.start(|context| async move {
1179            let (network, mut oracle) = Network::new(
1180                context.with_label("network"),
1181                Config {
1182                    max_size: 1024 * 1024,
1183                    disconnect_on_block: true,
1184                    tracked_peer_sets: None,
1185                },
1186            );
1187            network.start();
1188
1189            // Configuration
1190            const NUM_PEERS: usize = 100;
1191            const MESSAGE_SIZE: usize = 1000; // 1KB per message
1192            const EFFECTIVE_BPS: usize = 10_000; // 10KB/s egress/ingress per peer
1193
1194            // Create peers
1195            let mut peers = Vec::with_capacity(NUM_PEERS + 1);
1196            let mut senders = Vec::with_capacity(NUM_PEERS + 1);
1197            let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
1198
1199            // Create the main peer (index 0) and 100 other peers
1200            for i in 0..=NUM_PEERS {
1201                let pk = PrivateKey::from_seed(i as u64).public_key();
1202                let (sender, receiver) = oracle
1203                    .control(pk.clone())
1204                    .register(0, TEST_QUOTA)
1205                    .await
1206                    .unwrap();
1207                peers.push(pk);
1208                senders.push(sender);
1209                receivers.push(receiver);
1210            }
1211
1212            // Set bandwidth limits for all peers
1213            for pk in &peers {
1214                oracle
1215                    .limit_bandwidth(pk.clone(), Some(EFFECTIVE_BPS), Some(EFFECTIVE_BPS))
1216                    .await
1217                    .unwrap();
1218            }
1219
1220            // Link all peers to the main peer (peers[0]) with zero latency
1221            for peer in peers.iter().skip(1) {
1222                oracle
1223                    .add_link(
1224                        peer.clone(),
1225                        peers[0].clone(),
1226                        Link {
1227                            latency: Duration::ZERO,
1228                            jitter: Duration::ZERO,
1229                            success_rate: 1.0,
1230                        },
1231                    )
1232                    .await
1233                    .unwrap();
1234                oracle
1235                    .add_link(
1236                        peers[0].clone(),
1237                        peer.clone(),
1238                        Link {
1239                            latency: Duration::ZERO,
1240                            jitter: Duration::ZERO,
1241                            success_rate: 1.0,
1242                        },
1243                    )
1244                    .await
1245                    .unwrap();
1246            }
1247
1248            // One-to-many (main peer sends to all others). Verifies that bandwidth limits
1249            // are properly enforced when sending to multiple recipients
1250            let start = context.current();
1251
1252            // Send message to all peers concurrently
1253            // and wait for all sends to be acknowledged
1254            let msg = Bytes::from(vec![0u8; MESSAGE_SIZE]);
1255            for peer in peers.iter().skip(1) {
1256                senders[0]
1257                    .send(Recipients::One(peer.clone()), msg.clone(), true)
1258                    .await
1259                    .unwrap();
1260            }
1261
1262            // Verify all messages are received
1263            for receiver in receivers.iter_mut().skip(1) {
1264                let (origin, received) = receiver.recv().await.unwrap();
1265                assert_eq!(origin, peers[0]);
1266                assert_eq!(received.len(), MESSAGE_SIZE);
1267            }
1268
1269            let elapsed = context.current().duration_since(start).unwrap();
1270
1271            // Calculate expected time
1272            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1273
1274            assert!(
1275                elapsed >= Duration::from_millis(expected_ms as u64),
1276                "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1277            );
1278            assert!(
1279                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1280                "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1281            );
1282
1283            // Many-to-one (all peers send to the main peer)
1284            let start = context.current();
1285
1286            // Each peer sends a message to the main peer concurrently and we wait for all
1287            // sends to be acknowledged
1288            let msg = Bytes::from(vec![0; MESSAGE_SIZE]);
1289            for mut sender in senders.into_iter().skip(1) {
1290                sender
1291                    .send(Recipients::One(peers[0].clone()), msg.clone(), true)
1292                    .await
1293                    .unwrap();
1294            }
1295
1296            // Collect all messages at the main peer
1297            let mut received_from = HashSet::new();
1298            for _ in 1..=NUM_PEERS {
1299                let (origin, received) = receivers[0].recv().await.unwrap();
1300                assert_eq!(received.len(), MESSAGE_SIZE);
1301                assert!(
1302                    received_from.insert(origin.clone()),
1303                    "Received duplicate from {origin:?}"
1304                );
1305            }
1306
1307            let elapsed = context.current().duration_since(start).unwrap();
1308
1309            // Calculate expected time
1310            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1311
1312            assert!(
1313                elapsed >= Duration::from_millis(expected_ms as u64),
1314                "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1315            );
1316            assert!(
1317                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1318                "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1319            );
1320
1321            // Verify we received from all peers
1322            assert_eq!(received_from.len(), NUM_PEERS);
1323            for peer in peers.iter().skip(1) {
1324                assert!(received_from.contains(peer));
1325            }
1326        });
1327    }
1328
1329    #[test]
1330    fn test_message_ordering() {
1331        // Test that messages arrive in order even with variable latency
1332        let executor = deterministic::Runner::default();
1333        executor.start(|context| async move {
1334            let (network, mut oracle) = Network::new(
1335                context.with_label("network"),
1336                Config {
1337                    max_size: 1024 * 1024,
1338                    disconnect_on_block: true,
1339                    tracked_peer_sets: None,
1340                },
1341            );
1342            network.start();
1343
1344            // Register agents
1345            let pk1 = PrivateKey::from_seed(1).public_key();
1346            let pk2 = PrivateKey::from_seed(2).public_key();
1347            let (mut sender, _) = oracle
1348                .control(pk1.clone())
1349                .register(0, TEST_QUOTA)
1350                .await
1351                .unwrap();
1352            let (_, mut receiver) = oracle
1353                .control(pk2.clone())
1354                .register(0, TEST_QUOTA)
1355                .await
1356                .unwrap();
1357
1358            // Link agents with high jitter to create variable delays
1359            oracle
1360                .add_link(
1361                    pk1.clone(),
1362                    pk2.clone(),
1363                    Link {
1364                        latency: Duration::from_millis(50),
1365                        jitter: Duration::from_millis(40),
1366                        success_rate: 1.0,
1367                    },
1368                )
1369                .await
1370                .unwrap();
1371
1372            // Send multiple messages that should arrive in order
1373            let messages = vec![
1374                Bytes::from("message 1"),
1375                Bytes::from("message 2"),
1376                Bytes::from("message 3"),
1377                Bytes::from("message 4"),
1378                Bytes::from("message 5"),
1379            ];
1380
1381            for msg in messages.clone() {
1382                sender
1383                    .send(Recipients::One(pk2.clone()), msg, true)
1384                    .await
1385                    .unwrap();
1386            }
1387
1388            // Receive messages and verify they arrive in order
1389            for expected_msg in messages {
1390                let (origin, received_msg) = receiver.recv().await.unwrap();
1391                assert_eq!(origin, pk1);
1392                assert_eq!(received_msg, expected_msg);
1393            }
1394        })
1395    }
1396
1397    #[test]
1398    fn test_high_latency_message_blocks_followup() {
1399        let executor = deterministic::Runner::default();
1400        executor.start(|context| async move {
1401            let (network, mut oracle) = Network::new(
1402                context.with_label("network"),
1403                Config {
1404                    max_size: 1024 * 1024,
1405                    disconnect_on_block: true,
1406                    tracked_peer_sets: None,
1407                },
1408            );
1409            network.start();
1410
1411            let pk1 = PrivateKey::from_seed(1).public_key();
1412            let pk2 = PrivateKey::from_seed(2).public_key();
1413            let (mut sender, _) = oracle.control(pk1.clone()).register(0, TEST_QUOTA).await.unwrap();
1414            let (_, mut receiver) = oracle.control(pk2.clone()).register(0, TEST_QUOTA).await.unwrap();
1415
1416            const BPS: usize = 1_000;
1417            oracle
1418                .limit_bandwidth(pk1.clone(), Some(BPS), None)
1419                .await
1420                .unwrap();
1421            oracle
1422                .limit_bandwidth(pk2.clone(), None, Some(BPS))
1423                .await
1424                .unwrap();
1425
1426            // Send slow message
1427            oracle
1428                .add_link(
1429                    pk1.clone(),
1430                    pk2.clone(),
1431                    Link {
1432                        latency: Duration::from_millis(5_000),
1433                        jitter: Duration::ZERO,
1434                        success_rate: 1.0,
1435                    },
1436                )
1437                .await
1438                .unwrap();
1439
1440            let slow = Bytes::from(vec![0u8; 1_000]);
1441            sender
1442                .send(Recipients::One(pk2.clone()), slow.clone(), true)
1443                .await
1444                .unwrap();
1445
1446            // Update link
1447            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
1448            oracle
1449                .add_link(
1450                    pk1.clone(),
1451                    pk2.clone(),
1452                    Link {
1453                        latency: Duration::from_millis(1),
1454                        jitter: Duration::ZERO,
1455                        success_rate: 1.0,
1456                    },
1457                )
1458                .await
1459                .unwrap();
1460
1461            // Send fast message
1462            let fast = Bytes::from(vec![1u8; 1_000]);
1463            sender
1464                .send(Recipients::One(pk2.clone()), fast.clone(), true)
1465                .await
1466                .unwrap();
1467
1468            let start = context.current();
1469            let (origin1, message1) = receiver.recv().await.unwrap();
1470            assert_eq!(origin1, pk1);
1471            assert_eq!(message1, slow);
1472            let first_elapsed = context.current().duration_since(start).unwrap();
1473
1474            let (origin2, message2) = receiver.recv().await.unwrap();
1475            let second_elapsed = context.current().duration_since(start).unwrap();
1476            assert_eq!(origin2, pk1);
1477            assert_eq!(message2, fast);
1478
1479            let egress_time = Duration::from_secs(1);
1480            let slow_latency = Duration::from_millis(5_000);
1481            let expected_first = egress_time + slow_latency;
1482            let tolerance = Duration::from_millis(10);
1483            assert!(
1484                first_elapsed >= expected_first.saturating_sub(tolerance)
1485                    && first_elapsed <= expected_first + tolerance,
1486                "slow message arrived outside expected window: {first_elapsed:?} (expected {expected_first:?} ± {tolerance:?})"
1487            );
1488            assert!(
1489                second_elapsed >= first_elapsed,
1490                "fast message arrived before slow transmission completed"
1491            );
1492
1493            let arrival_gap = second_elapsed
1494                .checked_sub(first_elapsed)
1495                .expect("timestamps ordered");
1496            assert!(
1497                arrival_gap >= egress_time.saturating_sub(tolerance)
1498                    && arrival_gap <= egress_time + tolerance,
1499                "next arrival deviated from transmit duration (gap = {arrival_gap:?}, expected {egress_time:?} ± {tolerance:?})"
1500            );
1501        })
1502    }
1503
1504    #[test]
1505    fn test_many_to_one_bandwidth_sharing() {
1506        let executor = deterministic::Runner::default();
1507        executor.start(|context| async move {
1508            let (network, mut oracle) = Network::new(
1509                context.with_label("network"),
1510                Config {
1511                    max_size: 1024 * 1024,
1512                    disconnect_on_block: true,
1513                    tracked_peer_sets: None,
1514                },
1515            );
1516            network.start();
1517
1518            // Create 10 senders and 1 receiver
1519            let mut senders = Vec::new();
1520            let mut sender_txs = Vec::new();
1521            for i in 0..10 {
1522                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1523                senders.push(sender.clone());
1524                let (tx, _) = oracle
1525                    .control(sender.clone())
1526                    .register(0, TEST_QUOTA)
1527                    .await
1528                    .unwrap();
1529                sender_txs.push(tx);
1530
1531                // Each sender has 10KB/s egress
1532                oracle
1533                    .limit_bandwidth(sender.clone(), Some(10_000), None)
1534                    .await
1535                    .unwrap();
1536            }
1537
1538            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1539            let (_, mut receiver_rx) = oracle
1540                .control(receiver.clone())
1541                .register(0, TEST_QUOTA)
1542                .await
1543                .unwrap();
1544
1545            // Receiver has 100KB/s ingress
1546            oracle
1547                .limit_bandwidth(receiver.clone(), None, Some(100_000))
1548                .await
1549                .unwrap();
1550
1551            // Add links with no latency
1552            for sender in &senders {
1553                oracle
1554                    .add_link(
1555                        sender.clone(),
1556                        receiver.clone(),
1557                        Link {
1558                            latency: Duration::ZERO,
1559                            jitter: Duration::ZERO,
1560                            success_rate: 1.0,
1561                        },
1562                    )
1563                    .await
1564                    .unwrap();
1565            }
1566
1567            let start = context.current();
1568
1569            // All senders send 10KB simultaneously
1570            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1571                let receiver_clone = receiver.clone();
1572                let msg = Bytes::from(vec![i as u8; 10_000]);
1573                tx.send(Recipients::One(receiver_clone), msg, true)
1574                    .await
1575                    .unwrap();
1576            }
1577
1578            // All 10 messages should be received at ~1s
1579            // (100KB total data at 100KB/s aggregate bandwidth)
1580            for i in 0..10 {
1581                let (_, _msg) = receiver_rx.recv().await.unwrap();
1582                let recv_time = context.current().duration_since(start).unwrap();
1583
1584                // Messages should all complete around 1s
1585                assert!(
1586                    recv_time >= Duration::from_millis(950)
1587                        && recv_time <= Duration::from_millis(1100),
1588                    "Message {i} received at {recv_time:?}, expected ~1s",
1589                );
1590            }
1591        });
1592    }
1593
1594    #[test]
1595    fn test_one_to_many_fast_sender() {
1596        // Test that 1 fast sender (100KB/s) sending to 10 receivers (10KB/s each)
1597        // should complete all sends in ~1s and all messages received in ~1s
1598        let executor = deterministic::Runner::default();
1599        executor.start(|context| async move {
1600            let (network, mut oracle) = Network::new(
1601                context.with_label("network"),
1602                Config {
1603                    max_size: 1024 * 1024,
1604                    disconnect_on_block: true,
1605                    tracked_peer_sets: None,
1606                },
1607            );
1608            network.start();
1609
1610            // Create fast sender
1611            let sender = ed25519::PrivateKey::from_seed(0).public_key();
1612            let (sender_tx, _) = oracle
1613                .control(sender.clone())
1614                .register(0, TEST_QUOTA)
1615                .await
1616                .unwrap();
1617
1618            // Sender has 100KB/s egress
1619            oracle
1620                .limit_bandwidth(sender.clone(), Some(100_000), None)
1621                .await
1622                .unwrap();
1623
1624            // Create 10 receivers
1625            let mut receivers = Vec::new();
1626            let mut receiver_rxs = Vec::new();
1627            for i in 0..10 {
1628                let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1629                receivers.push(receiver.clone());
1630                let (_, rx) = oracle
1631                    .control(receiver.clone())
1632                    .register(0, TEST_QUOTA)
1633                    .await
1634                    .unwrap();
1635                receiver_rxs.push(rx);
1636
1637                // Each receiver has 10KB/s ingress
1638                oracle
1639                    .limit_bandwidth(receiver.clone(), None, Some(10_000))
1640                    .await
1641                    .unwrap();
1642
1643                // Add link with no latency
1644                oracle
1645                    .add_link(
1646                        sender.clone(),
1647                        receiver.clone(),
1648                        Link {
1649                            latency: Duration::ZERO,
1650                            jitter: Duration::ZERO,
1651                            success_rate: 1.0,
1652                        },
1653                    )
1654                    .await
1655                    .unwrap();
1656            }
1657
1658            let start = context.current();
1659
1660            // Send 10KB to each receiver (100KB total)
1661            for (i, receiver) in receivers.iter().enumerate() {
1662                let mut sender_tx = sender_tx.clone();
1663                let receiver_clone = receiver.clone();
1664                let msg = Bytes::from(vec![i as u8; 10_000]);
1665                sender_tx
1666                    .send(Recipients::One(receiver_clone), msg, true)
1667                    .await
1668                    .unwrap();
1669            }
1670
1671            // Each receiver should receive their 10KB message in ~1s (10KB at 10KB/s)
1672            for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1673                let (_, msg) = rx.recv().await.unwrap();
1674                assert_eq!(msg[0], i as u8);
1675                let recv_time = context.current().duration_since(start).unwrap();
1676
1677                // All messages should be received around 1s
1678                assert!(
1679                    recv_time >= Duration::from_millis(950)
1680                        && recv_time <= Duration::from_millis(1100),
1681                    "Receiver {i} received at {recv_time:?}, expected ~1s",
1682                );
1683            }
1684        });
1685    }
1686
1687    #[test]
1688    fn test_many_slow_senders_to_fast_receiver() {
1689        // Test that 10 slow senders (1KB/s each) sending to a fast receiver (10KB/s)
1690        // should complete all transfers in ~1s
1691        let executor = deterministic::Runner::default();
1692        executor.start(|context| async move {
1693            let (network, mut oracle) = Network::new(
1694                context.with_label("network"),
1695                Config {
1696                    max_size: 1024 * 1024,
1697                    disconnect_on_block: true,
1698                    tracked_peer_sets: None,
1699                },
1700            );
1701            network.start();
1702
1703            // Create 10 slow senders
1704            let mut senders = Vec::new();
1705            let mut sender_txs = Vec::new();
1706            for i in 0..10 {
1707                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1708                senders.push(sender.clone());
1709                let (tx, _) = oracle
1710                    .control(sender.clone())
1711                    .register(0, TEST_QUOTA)
1712                    .await
1713                    .unwrap();
1714                sender_txs.push(tx);
1715
1716                // Each sender has 1KB/s egress (slow)
1717                oracle
1718                    .limit_bandwidth(sender.clone(), Some(1_000), None)
1719                    .await
1720                    .unwrap();
1721            }
1722
1723            // Create fast receiver
1724            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1725            let (_, mut receiver_rx) = oracle
1726                .control(receiver.clone())
1727                .register(0, TEST_QUOTA)
1728                .await
1729                .unwrap();
1730
1731            // Receiver has 10KB/s ingress (can handle all 10 senders at full speed)
1732            oracle
1733                .limit_bandwidth(receiver.clone(), None, Some(10_000))
1734                .await
1735                .unwrap();
1736
1737            // Add links with no latency
1738            for sender in &senders {
1739                oracle
1740                    .add_link(
1741                        sender.clone(),
1742                        receiver.clone(),
1743                        Link {
1744                            latency: Duration::ZERO,
1745                            jitter: Duration::ZERO,
1746                            success_rate: 1.0,
1747                        },
1748                    )
1749                    .await
1750                    .unwrap();
1751            }
1752
1753            let start = context.current();
1754
1755            // All senders send 1KB simultaneously
1756            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1757                let receiver_clone = receiver.clone();
1758                let msg = Bytes::from(vec![i as u8; 1_000]);
1759                tx.send(Recipients::One(receiver_clone), msg, true)
1760                    .await
1761                    .unwrap();
1762            }
1763
1764            // Each sender takes 1s to transmit 1KB at 1KB/s
1765            // All transmissions happen in parallel, so total send time is ~1s
1766
1767            // All 10 messages (10KB total) should be received at ~1s
1768            // Receiver processes at 10KB/s, can handle all 10KB in 1s
1769            for i in 0..10 {
1770                let (_, _msg) = receiver_rx.recv().await.unwrap();
1771                let recv_time = context.current().duration_since(start).unwrap();
1772
1773                // All messages should complete around 1s
1774                assert!(
1775                    recv_time >= Duration::from_millis(950)
1776                        && recv_time <= Duration::from_millis(1100),
1777                    "Message {i} received at {recv_time:?}, expected ~1s",
1778                );
1779            }
1780        });
1781    }
1782
1783    #[test]
1784    fn test_dynamic_bandwidth_allocation_staggered() {
1785        // Test that bandwidth is dynamically allocated as
1786        // transfers start and complete at different times
1787        //
1788        // 3 senders to 1 receiver, starting at different times
1789        // Receiver has 30KB/s, senders each have 30KB/s
1790        let executor = deterministic::Runner::default();
1791        executor.start(|context| async move {
1792            let (network, mut oracle) = Network::new(
1793                context.with_label("network"),
1794                Config {
1795                    max_size: 1024 * 1024,
1796                    disconnect_on_block: true,
1797                    tracked_peer_sets: None,
1798                },
1799            );
1800            network.start();
1801
1802            // Create 3 senders
1803            let mut senders = Vec::new();
1804            let mut sender_txs = Vec::new();
1805            for i in 0..3 {
1806                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1807                senders.push(sender.clone());
1808                let (tx, _) = oracle
1809                    .control(sender.clone())
1810                    .register(0, TEST_QUOTA)
1811                    .await
1812                    .unwrap();
1813                sender_txs.push(tx);
1814
1815                // Each sender has 30KB/s egress
1816                oracle
1817                    .limit_bandwidth(sender.clone(), Some(30_000), None)
1818                    .await
1819                    .unwrap();
1820            }
1821
1822            // Create receiver with 30KB/s ingress
1823            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1824            let (_, mut receiver_rx) = oracle
1825                .control(receiver.clone())
1826                .register(0, TEST_QUOTA)
1827                .await
1828                .unwrap();
1829            oracle
1830                .limit_bandwidth(receiver.clone(), None, Some(30_000))
1831                .await
1832                .unwrap();
1833
1834            // Add links with minimal latency
1835            for sender in &senders {
1836                oracle
1837                    .add_link(
1838                        sender.clone(),
1839                        receiver.clone(),
1840                        Link {
1841                            latency: Duration::from_millis(1),
1842                            jitter: Duration::ZERO,
1843                            success_rate: 1.0,
1844                        },
1845                    )
1846                    .await
1847                    .unwrap();
1848            }
1849
1850            let start = context.current();
1851
1852            // Sender 0: sends 30KB at t=0
1853            // Gets full 30KB/s for the first 0.5s, then shares with sender 1
1854            // at 15KB/s until completion at t=1.5s
1855            let mut tx0 = sender_txs[0].clone();
1856            let rx_clone = receiver.clone();
1857            context.clone().spawn(move |_| async move {
1858                let msg = Bytes::from(vec![0u8; 30_000]);
1859                tx0.send(Recipients::One(rx_clone), msg, true)
1860                    .await
1861                    .unwrap();
1862            });
1863
1864            // Sender 1: sends 30KB at t=0.5s
1865            // Shares bandwidth with sender 0 (15KB/s each) until t=1.5s,
1866            // then gets the full 30KB/s
1867            let mut tx1 = sender_txs[1].clone();
1868            let rx_clone = receiver.clone();
1869            context.clone().spawn(move |context| async move {
1870                context.sleep(Duration::from_millis(500)).await;
1871                let msg = Bytes::from(vec![1u8; 30_000]);
1872                tx1.send(Recipients::One(rx_clone), msg, true)
1873                    .await
1874                    .unwrap();
1875            });
1876
1877            // Sender 2: sends 15KB at t=1.5s and shares the receiver with
1878            // sender 1, completing at roughly t=2.5s
1879            let mut tx2 = sender_txs[2].clone();
1880            let rx_clone = receiver.clone();
1881            context.clone().spawn(move |context| async move {
1882                context.sleep(Duration::from_millis(1500)).await;
1883                let msg = Bytes::from(vec![2u8; 15_000]);
1884                tx2.send(Recipients::One(rx_clone), msg, true)
1885                    .await
1886                    .unwrap();
1887            });
1888
1889            // Receive and verify timing
1890            // Message 0: starts at t=0, shares bandwidth after 0.5s,
1891            // and completes at t=1.5s (plus link latency)
1892            let (_, msg0) = receiver_rx.recv().await.unwrap();
1893            assert_eq!(msg0[0], 0);
1894            let t0 = context.current().duration_since(start).unwrap();
1895            assert!(
1896                t0 >= Duration::from_millis(1490) && t0 <= Duration::from_millis(1600),
1897                "Message 0 received at {t0:?}, expected ~1.5s",
1898            );
1899
1900            // The algorithm may deliver messages in a different order based on
1901            // efficient bandwidth usage. Let's collect the next two messages and
1902            // verify their timings regardless of order.
1903            let (_, msg_a) = receiver_rx.recv().await.unwrap();
1904            let t_a = context.current().duration_since(start).unwrap();
1905
1906            let (_, msg_b) = receiver_rx.recv().await.unwrap();
1907            let t_b = context.current().duration_since(start).unwrap();
1908
1909            // Figure out which message is which based on content
1910            let (msg1, t1, msg2, t2) = if msg_a[0] == 1 {
1911                (msg_a, t_a, msg_b, t_b)
1912            } else {
1913                (msg_b, t_b, msg_a, t_a)
1914            };
1915
1916            assert_eq!(msg1[0], 1);
1917            assert_eq!(msg2[0], 2);
1918
1919            // Message 1 (30KB) started at t=0.5s
1920            // Message 2 (15KB) started at t=1.5s
1921            // With efficient scheduling, message 2 might complete first since it's smaller
1922            // Both should complete between 1.5s and 2.5s
1923            assert!(
1924                t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1925                "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1926            );
1927
1928            assert!(
1929                t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1930                "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1931            );
1932        });
1933    }
1934
1935    #[test]
1936    fn test_dynamic_bandwidth_varied_sizes() {
1937        // Test dynamic allocation with different message sizes arriving simultaneously
1938        // This tests that smaller messages complete first when bandwidth is shared
1939        let executor = deterministic::Runner::default();
1940        executor.start(|context| async move {
1941            let (network, mut oracle) = Network::new(
1942                context.with_label("network"),
1943                Config {
1944                    max_size: 1024 * 1024,
1945                    disconnect_on_block: true,
1946                    tracked_peer_sets: None,
1947                },
1948            );
1949            network.start();
1950
1951            // Create 3 senders
1952            let mut senders = Vec::new();
1953            let mut sender_txs = Vec::new();
1954            for i in 0..3 {
1955                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1956                senders.push(sender.clone());
1957                let (tx, _) = oracle
1958                    .control(sender.clone())
1959                    .register(0, TEST_QUOTA)
1960                    .await
1961                    .unwrap();
1962                sender_txs.push(tx);
1963
1964                // Each sender has unlimited egress
1965                oracle
1966                    .limit_bandwidth(sender.clone(), None, None)
1967                    .await
1968                    .unwrap();
1969            }
1970
1971            // Create receiver with 30KB/s ingress
1972            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1973            let (_, mut receiver_rx) = oracle
1974                .control(receiver.clone())
1975                .register(0, TEST_QUOTA)
1976                .await
1977                .unwrap();
1978            oracle
1979                .limit_bandwidth(receiver.clone(), None, Some(30_000))
1980                .await
1981                .unwrap();
1982
1983            // Add links
1984            for sender in &senders {
1985                oracle
1986                    .add_link(
1987                        sender.clone(),
1988                        receiver.clone(),
1989                        Link {
1990                            latency: Duration::from_millis(1),
1991                            jitter: Duration::ZERO,
1992                            success_rate: 1.0,
1993                        },
1994                    )
1995                    .await
1996                    .unwrap();
1997            }
1998
1999            let start = context.current();
2000
2001            // All start at the same time but with different sizes
2002            //
2003            // The scheduler reserves bandwidth in advance, the actual behavior
2004            // depends on the order tasks are processed. Since all senders
2005            // start at once, they'll compete for bandwidth
2006            let sizes = [10_000, 20_000, 30_000];
2007            for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
2008                let rx_clone = receiver.clone();
2009                let msg_size = *size;
2010                let msg = Bytes::from(vec![i as u8; msg_size]);
2011                tx.send(Recipients::One(rx_clone), msg, true).await.unwrap();
2012            }
2013
2014            // Receive messages. They arrive in the order they were scheduled,
2015            // not necessarily size order. Collect all messages and sort by
2016            // receive time to verify timing
2017            let mut messages = Vec::new();
2018            for _ in 0..3 {
2019                let (_, msg) = receiver_rx.recv().await.unwrap();
2020                let t = context.current().duration_since(start).unwrap();
2021                messages.push((msg[0] as usize, msg.len(), t));
2022            }
2023
2024            // When all start at once, they'll reserve bandwidth slots
2025            // sequentially. First gets full 30KB/s, others wait or get
2026            // remaining bandwidth. Just verify all messages arrived and total
2027            // time is reasonable
2028            assert_eq!(messages.len(), 3);
2029
2030            // Total data is 60KB at 30KB/s receiver ingress = 2s minimum
2031            let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
2032            assert!(
2033                max_time >= Duration::from_millis(2000),
2034                "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
2035            );
2036        });
2037    }
2038
2039    #[test]
2040    fn test_bandwidth_pipe_reservation_duration() {
2041        // Test that bandwidth pipe is only reserved for transmission duration, not latency
2042        // This means new messages can start transmitting while others are still in flight
2043        let executor = deterministic::Runner::default();
2044        executor.start(|context| async move {
2045            let (network, mut oracle) = Network::new(
2046                context.with_label("network"),
2047                Config {
2048                    max_size: 1024 * 1024,
2049                    disconnect_on_block: true,
2050                    tracked_peer_sets: None,
2051                },
2052            );
2053            network.start();
2054
2055            // Create two peers
2056            let sender = PrivateKey::from_seed(1).public_key();
2057            let receiver = PrivateKey::from_seed(2).public_key();
2058
2059            let (sender_tx, _) = oracle
2060                .control(sender.clone())
2061                .register(0, TEST_QUOTA)
2062                .await
2063                .unwrap();
2064            let (_, mut receiver_rx) = oracle
2065                .control(receiver.clone())
2066                .register(0, TEST_QUOTA)
2067                .await
2068                .unwrap();
2069
2070            // Set bandwidth: 1000 B/s (1 byte per millisecond)
2071            oracle
2072                .limit_bandwidth(sender.clone(), Some(1000), None)
2073                .await
2074                .unwrap();
2075            oracle
2076                .limit_bandwidth(receiver.clone(), None, Some(1000))
2077                .await
2078                .unwrap();
2079
2080            // Add link with significant latency (1 second)
2081            oracle
2082                .add_link(
2083                    sender.clone(),
2084                    receiver.clone(),
2085                    Link {
2086                        latency: Duration::from_secs(1), // 1 second latency
2087                        jitter: Duration::ZERO,
2088                        success_rate: 1.0,
2089                    },
2090                )
2091                .await
2092                .unwrap();
2093
2094            // Send 3 messages of 500 bytes each
2095            // At 1000 B/s, each message takes 500ms to transmit
2096            // With 1s latency, if pipe was reserved for tx+latency, total would be:
2097            //   - Msg 1: 0-1500ms (500ms tx + 1000ms latency)
2098            //   - Msg 2: 1500-3000ms (starts after msg 1 fully delivered)
2099            //   - Msg 3: 3000-4500ms
2100            // But if pipe is only reserved during tx (correct behavior):
2101            //   - Msg 1: tx 0-500ms, delivered at 1500ms
2102            //   - Msg 2: tx 500-1000ms, delivered at 2000ms
2103            //   - Msg 3: tx 1000-1500ms, delivered at 2500ms
2104            let start = context.current();
2105
2106            // Send all messages in quick succession
2107            for i in 0..3 {
2108                let mut sender_tx = sender_tx.clone();
2109                let receiver = receiver.clone();
2110                let msg = Bytes::from(vec![i; 500]);
2111                sender_tx
2112                    .send(Recipients::One(receiver), msg, false)
2113                    .await
2114                    .unwrap();
2115            }
2116
2117            // Wait for all receives to complete and record their completion times
2118            let mut receive_times = Vec::new();
2119            for i in 0..3 {
2120                let (_, received) = receiver_rx.recv().await.unwrap();
2121                receive_times.push(context.current().duration_since(start).unwrap());
2122                assert_eq!(received[0], i);
2123            }
2124
2125            // Messages should be received at:
2126            // - Msg 1: ~1500ms (500ms transmission + 1000ms latency)
2127            // - Msg 2: ~2000ms (500ms wait + 500ms transmission + 1000ms latency)
2128            // - Msg 3: ~2500ms (1000ms wait + 500ms transmission + 1000ms latency)
2129            for (i, time) in receive_times.iter().enumerate() {
2130                let expected_min = (i as u64 * 500) + 1500;
2131                let expected_max = expected_min + 100;
2132
2133                assert!(
2134                    *time >= Duration::from_millis(expected_min)
2135                        && *time < Duration::from_millis(expected_max),
2136                    "Message {} should arrive at ~{}ms, got {:?}",
2137                    i + 1,
2138                    expected_min,
2139                    time
2140                );
2141            }
2142        });
2143    }
2144
2145    #[test]
2146    fn test_dynamic_bandwidth_affects_new_transfers() {
2147        // This test verifies that bandwidth changes affect NEW transfers,
2148        // not transfers already in progress (which have their reservations locked in)
2149        let executor = deterministic::Runner::default();
2150        executor.start(|context| async move {
2151            let (network, mut oracle) = Network::new(
2152                context.with_label("network"),
2153                Config {
2154                    max_size: 1024 * 1024,
2155                    disconnect_on_block: true,
2156                    tracked_peer_sets: None,
2157                },
2158            );
2159            network.start();
2160
2161            let pk_sender = PrivateKey::from_seed(1).public_key();
2162            let pk_receiver = PrivateKey::from_seed(2).public_key();
2163
2164            // Register peers and establish link
2165            let (mut sender_tx, _) = oracle
2166                .control(pk_sender.clone())
2167                .register(0, TEST_QUOTA)
2168                .await
2169                .unwrap();
2170            let (_, mut receiver_rx) = oracle
2171                .control(pk_receiver.clone())
2172                .register(0, TEST_QUOTA)
2173                .await
2174                .unwrap();
2175            oracle
2176                .add_link(
2177                    pk_sender.clone(),
2178                    pk_receiver.clone(),
2179                    Link {
2180                        latency: Duration::from_millis(1), // Small latency
2181                        jitter: Duration::ZERO,
2182                        success_rate: 1.0,
2183                    },
2184                )
2185                .await
2186                .unwrap();
2187
2188            // Initial bandwidth: 10 KB/s
2189            oracle
2190                .limit_bandwidth(pk_sender.clone(), Some(10_000), None)
2191                .await
2192                .unwrap();
2193            oracle
2194                .limit_bandwidth(pk_receiver.clone(), None, Some(10_000))
2195                .await
2196                .unwrap();
2197
2198            // Send first message at 10 KB/s
2199            let msg1 = Bytes::from(vec![1u8; 20_000]); // 20 KB
2200            let start_time = context.current();
2201            sender_tx
2202                .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2203                .await
2204                .unwrap();
2205
2206            // Receive first message (should take ~2s at 10KB/s)
2207            let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
2208            let msg1_time = context.current().duration_since(start_time).unwrap();
2209            assert_eq!(received_msg1.len(), 20_000);
2210            assert!(
2211                msg1_time >= Duration::from_millis(1999)
2212                    && msg1_time <= Duration::from_millis(2010),
2213                "First message should take ~2s, got {msg1_time:?}",
2214            );
2215
2216            // Change bandwidth to 2 KB/s
2217            oracle
2218                .limit_bandwidth(pk_sender.clone(), Some(2_000), None)
2219                .await
2220                .unwrap();
2221
2222            // Send second message at new bandwidth
2223            let msg2 = Bytes::from(vec![2u8; 10_000]); // 10 KB
2224            let msg2_start = context.current();
2225            sender_tx
2226                .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
2227                .await
2228                .unwrap();
2229
2230            // Receive second message (should take ~5s at 2KB/s)
2231            let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
2232            let msg2_time = context.current().duration_since(msg2_start).unwrap();
2233            assert_eq!(received_msg2.len(), 10_000);
2234            assert!(
2235                msg2_time >= Duration::from_millis(4999)
2236                    && msg2_time <= Duration::from_millis(5010),
2237                "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
2238            );
2239        });
2240    }
2241
2242    #[test]
2243    fn test_zero_receiver_ingress_bandwidth() {
2244        let executor = deterministic::Runner::default();
2245        executor.start(|context| async move {
2246            let (network, mut oracle) = Network::new(
2247                context.with_label("network"),
2248                Config {
2249                    max_size: 1024 * 1024,
2250                    disconnect_on_block: true,
2251                    tracked_peer_sets: None,
2252                },
2253            );
2254            network.start();
2255
2256            let pk_sender = PrivateKey::from_seed(1).public_key();
2257            let pk_receiver = PrivateKey::from_seed(2).public_key();
2258
2259            // Register peers and establish link
2260            let (mut sender_tx, _) = oracle
2261                .control(pk_sender.clone())
2262                .register(0, TEST_QUOTA)
2263                .await
2264                .unwrap();
2265            let (_, mut receiver_rx) = oracle
2266                .control(pk_receiver.clone())
2267                .register(0, TEST_QUOTA)
2268                .await
2269                .unwrap();
2270            oracle
2271                .add_link(
2272                    pk_sender.clone(),
2273                    pk_receiver.clone(),
2274                    Link {
2275                        latency: Duration::ZERO,
2276                        jitter: Duration::ZERO,
2277                        success_rate: 1.0,
2278                    },
2279                )
2280                .await
2281                .unwrap();
2282
2283            // Set sender bandwidth to 0
2284            oracle
2285                .limit_bandwidth(pk_receiver.clone(), None, Some(0))
2286                .await
2287                .unwrap();
2288
2289            // Send message to receiver
2290            let msg1 = Bytes::from(vec![1u8; 20_000]); // 20 KB
2291            let sent = sender_tx
2292                .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2293                .await
2294                .unwrap();
2295            assert_eq!(sent.len(), 1);
2296            assert_eq!(sent[0], pk_receiver);
2297
2298            // Message should not be received after 10 seconds
2299            select! {
2300                _ = receiver_rx.recv() => {
2301                    panic!("unexpected message");
2302                },
2303                _ = context.sleep(Duration::from_secs(10)) => {},
2304            }
2305
2306            // Unset bandwidth
2307            oracle
2308                .limit_bandwidth(pk_receiver.clone(), None, None)
2309                .await
2310                .unwrap();
2311
2312            // Message should be immediately received
2313            select! {
2314                _ = receiver_rx.recv() => {},
2315                _ = context.sleep(Duration::from_secs(1)) => {
2316                    panic!("timeout");
2317                },
2318            }
2319        });
2320    }
2321
2322    #[test]
2323    fn test_zero_sender_egress_bandwidth() {
2324        let executor = deterministic::Runner::default();
2325        executor.start(|context| async move {
2326            let (network, mut oracle) = Network::new(
2327                context.with_label("network"),
2328                Config {
2329                    max_size: 1024 * 1024,
2330                    disconnect_on_block: true,
2331                    tracked_peer_sets: None,
2332                },
2333            );
2334            network.start();
2335
2336            let pk_sender = PrivateKey::from_seed(1).public_key();
2337            let pk_receiver = PrivateKey::from_seed(2).public_key();
2338
2339            // Register peers and establish link
2340            let (mut sender_tx, _) = oracle
2341                .control(pk_sender.clone())
2342                .register(0, TEST_QUOTA)
2343                .await
2344                .unwrap();
2345            let (_, mut receiver_rx) = oracle
2346                .control(pk_receiver.clone())
2347                .register(0, TEST_QUOTA)
2348                .await
2349                .unwrap();
2350            oracle
2351                .add_link(
2352                    pk_sender.clone(),
2353                    pk_receiver.clone(),
2354                    Link {
2355                        latency: Duration::ZERO,
2356                        jitter: Duration::ZERO,
2357                        success_rate: 1.0,
2358                    },
2359                )
2360                .await
2361                .unwrap();
2362
2363            // Set sender bandwidth to 0
2364            oracle
2365                .limit_bandwidth(pk_sender.clone(), Some(0), None)
2366                .await
2367                .unwrap();
2368
2369            // Send message to receiver
2370            let msg1 = Bytes::from(vec![1u8; 20_000]); // 20 KB
2371            let sent = sender_tx
2372                .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2373                .await
2374                .unwrap();
2375            assert_eq!(sent.len(), 1);
2376            assert_eq!(sent[0], pk_receiver);
2377
2378            // Message should not be received after 10 seconds
2379            select! {
2380                _ = receiver_rx.recv() => {
2381                    panic!("unexpected message");
2382                },
2383                _ = context.sleep(Duration::from_secs(10)) => {},
2384            }
2385
2386            // Unset bandwidth
2387            oracle
2388                .limit_bandwidth(pk_sender.clone(), None, None)
2389                .await
2390                .unwrap();
2391
2392            // Message should be immediately received
2393            select! {
2394                _ = receiver_rx.recv() => {},
2395                _ = context.sleep(Duration::from_secs(1)) => {
2396                    panic!("timeout");
2397                },
2398            }
2399        });
2400    }
2401
2402    #[test]
2403    fn register_peer_set() {
2404        let executor = deterministic::Runner::default();
2405        executor.start(|context| async move {
2406            let (network, oracle) = Network::new(
2407                context.with_label("network"),
2408                Config {
2409                    max_size: 1024 * 1024,
2410                    disconnect_on_block: true,
2411                    tracked_peer_sets: Some(3),
2412                },
2413            );
2414            network.start();
2415
2416            let mut manager = oracle.manager();
2417            assert_eq!(manager.peer_set(0).await, Some([].try_into().unwrap()));
2418
2419            let pk1 = PrivateKey::from_seed(1).public_key();
2420            let pk2 = PrivateKey::from_seed(2).public_key();
2421            manager
2422                .update(0xFF, [pk1.clone(), pk2.clone()].try_into().unwrap())
2423                .await;
2424
2425            assert_eq!(
2426                manager.peer_set(0xFF).await.unwrap(),
2427                [pk1, pk2].try_into().unwrap()
2428            );
2429        });
2430    }
2431
2432    #[test]
2433    fn test_socket_manager() {
2434        let executor = deterministic::Runner::default();
2435        executor.start(|context| async move {
2436            let (network, oracle) = Network::new(
2437                context.with_label("network"),
2438                Config {
2439                    max_size: 1024 * 1024,
2440                    disconnect_on_block: true,
2441                    tracked_peer_sets: Some(3),
2442                },
2443            );
2444            network.start();
2445
2446            let pk1 = PrivateKey::from_seed(1).public_key();
2447            let pk2 = PrivateKey::from_seed(2).public_key();
2448            let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2449            let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
2450
2451            let mut manager = oracle.socket_manager();
2452            let peers: Map<_, _> = [(pk1.clone(), addr1.clone()), (pk2.clone(), addr2.clone())]
2453                .try_into()
2454                .unwrap();
2455            manager.update(1, peers).await;
2456
2457            let peer_set = manager.peer_set(1).await.expect("peer set missing");
2458            let keys: Vec<_> = Vec::from(peer_set.clone());
2459            assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2460
2461            let mut subscription = manager.subscribe().await;
2462            let (id, latest, all) = subscription.next().await.unwrap();
2463            assert_eq!(id, 1);
2464            let latest_keys: Vec<_> = Vec::from(latest.clone());
2465            assert_eq!(latest_keys, vec![pk1.clone(), pk2.clone()]);
2466            let all_keys: Vec<_> = Vec::from(all.clone());
2467            assert_eq!(all_keys, vec![pk1.clone(), pk2.clone()]);
2468
2469            let peers: Map<_, _> = [(pk2.clone(), addr2)].try_into().unwrap();
2470            manager.update(2, peers).await;
2471
2472            let (id, latest, all) = subscription.next().await.unwrap();
2473            assert_eq!(id, 2);
2474            let latest_keys: Vec<_> = Vec::from(latest);
2475            assert_eq!(latest_keys, vec![pk2.clone()]);
2476            let all_keys: Vec<_> = Vec::from(all);
2477            assert_eq!(all_keys, vec![pk1, pk2]);
2478        });
2479    }
2480
2481    #[test]
2482    fn test_socket_manager_with_asymmetric_addresses() {
2483        let executor = deterministic::Runner::default();
2484        executor.start(|context| async move {
2485            let (network, oracle) = Network::new(
2486                context.with_label("network"),
2487                Config {
2488                    max_size: 1024 * 1024,
2489                    disconnect_on_block: true,
2490                    tracked_peer_sets: Some(3),
2491                },
2492            );
2493            network.start();
2494
2495            let pk1 = PrivateKey::from_seed(1).public_key();
2496            let pk2 = PrivateKey::from_seed(2).public_key();
2497
2498            // Use asymmetric addresses where ingress (dial) differs from egress (filter)
2499            let addr1 = Address::Asymmetric {
2500                ingress: Ingress::Socket(SocketAddr::from(([10, 0, 0, 1], 8080))),
2501                egress: SocketAddr::from(([192, 168, 1, 1], 9090)),
2502            };
2503            let addr2 = Address::Asymmetric {
2504                ingress: Ingress::Dns {
2505                    host: hostname!("node2.example.com"),
2506                    port: 8080,
2507                },
2508                egress: SocketAddr::from(([192, 168, 1, 2], 9090)),
2509            };
2510
2511            let mut manager = oracle.socket_manager();
2512            let peers: Map<_, _> = [(pk1.clone(), addr1), (pk2.clone(), addr2)]
2513                .try_into()
2514                .unwrap();
2515            manager.update(1, peers).await;
2516
2517            // Verify peer set contains expected keys (addresses are ignored by simulated network)
2518            let peer_set = manager.peer_set(1).await.expect("peer set missing");
2519            let keys: Vec<_> = Vec::from(peer_set);
2520            assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2521
2522            // Verify subscription works
2523            let mut subscription = manager.subscribe().await;
2524            let (id, latest, _all) = subscription.next().await.unwrap();
2525            assert_eq!(id, 1);
2526            let latest_keys: Vec<_> = Vec::from(latest);
2527            assert_eq!(latest_keys, vec![pk1, pk2]);
2528        });
2529    }
2530
2531    #[test]
2532    fn test_peer_set_window_management() {
2533        let executor = deterministic::Runner::default();
2534        executor.start(|context| async move {
2535            let (network, mut oracle) = Network::new(
2536                context.with_label("network"),
2537                Config {
2538                    max_size: 1024 * 1024,
2539                    disconnect_on_block: true,
2540                    tracked_peer_sets: Some(2), // Only track 2 peer sets
2541                },
2542            );
2543            network.start();
2544
2545            // Create 4 peers
2546            let pk1 = PrivateKey::from_seed(1).public_key();
2547            let pk2 = PrivateKey::from_seed(2).public_key();
2548            let pk3 = PrivateKey::from_seed(3).public_key();
2549            let pk4 = PrivateKey::from_seed(4).public_key();
2550
2551            // Register first peer set with pk1 and pk2
2552            let mut manager = oracle.manager();
2553            manager
2554                .update(1, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2555                .await;
2556
2557            // Register channels for all peers
2558            let (mut sender1, _receiver1) = oracle
2559                .control(pk1.clone())
2560                .register(0, TEST_QUOTA)
2561                .await
2562                .unwrap();
2563            let (mut sender2, _receiver2) = oracle
2564                .control(pk2.clone())
2565                .register(0, TEST_QUOTA)
2566                .await
2567                .unwrap();
2568            let (mut sender3, _receiver3) = oracle
2569                .control(pk3.clone())
2570                .register(0, TEST_QUOTA)
2571                .await
2572                .unwrap();
2573            let (_mut_sender4, _receiver4) = oracle
2574                .control(pk4.clone())
2575                .register(0, TEST_QUOTA)
2576                .await
2577                .unwrap();
2578
2579            // Create bidirectional links between all peers
2580            for peer_a in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2581                for peer_b in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2582                    if peer_a != peer_b {
2583                        oracle
2584                            .add_link(
2585                                peer_a.clone(),
2586                                peer_b.clone(),
2587                                Link {
2588                                    latency: Duration::from_millis(1),
2589                                    jitter: Duration::ZERO,
2590                                    success_rate: 1.0,
2591                                },
2592                            )
2593                            .await
2594                            .unwrap();
2595                    }
2596                }
2597            }
2598
2599            // Send message from pk1 to pk2 (both in tracked set) - should succeed
2600            let sent = sender1
2601                .send(Recipients::One(pk2.clone()), Bytes::from("msg1"), false)
2602                .await
2603                .unwrap();
2604            assert_eq!(sent.len(), 1);
2605
2606            // Try to send from pk1 to pk3 (pk3 not in any tracked set) - should fail
2607            let sent = sender1
2608                .send(Recipients::One(pk3.clone()), Bytes::from("msg2"), false)
2609                .await
2610                .unwrap();
2611            assert_eq!(sent.len(), 0);
2612
2613            // Register second peer set with pk2 and pk3
2614            manager
2615                .update(2, vec![pk2.clone(), pk3.clone()].try_into().unwrap())
2616                .await;
2617
2618            // Now pk3 is in a tracked set, message should succeed
2619            let sent = sender1
2620                .send(Recipients::One(pk3.clone()), Bytes::from("msg3"), false)
2621                .await
2622                .unwrap();
2623            assert_eq!(sent.len(), 1);
2624
2625            // Register third peer set with pk3 and pk4 (this will evict peer set 1)
2626            manager
2627                .update(3, vec![pk3.clone(), pk4.clone()].try_into().unwrap())
2628                .await;
2629
2630            // pk1 should now be removed from all tracked sets
2631            // Try to send from pk2 to pk1 - should fail since pk1 is no longer tracked
2632            let sent = sender2
2633                .send(Recipients::One(pk1.clone()), Bytes::from("msg4"), false)
2634                .await
2635                .unwrap();
2636            assert_eq!(sent.len(), 0);
2637
2638            // pk3 should still be reachable (in sets 2 and 3)
2639            let sent = sender2
2640                .send(Recipients::One(pk3.clone()), Bytes::from("msg5"), false)
2641                .await
2642                .unwrap();
2643            assert_eq!(sent.len(), 1);
2644
2645            // pk4 should be reachable (in set 3)
2646            let sent = sender3
2647                .send(Recipients::One(pk4.clone()), Bytes::from("msg6"), false)
2648                .await
2649                .unwrap();
2650            assert_eq!(sent.len(), 1);
2651
2652            // Verify peer set contents
2653            let peer_set_2 = manager.peer_set(2).await.unwrap();
2654            assert!(peer_set_2.as_ref().contains(&pk2));
2655            assert!(peer_set_2.as_ref().contains(&pk3));
2656
2657            let peer_set_3 = manager.peer_set(3).await.unwrap();
2658            assert!(peer_set_3.as_ref().contains(&pk3));
2659            assert!(peer_set_3.as_ref().contains(&pk4));
2660
2661            // Peer set 1 should no longer exist
2662            assert!(manager.peer_set(1).await.is_none());
2663        });
2664    }
2665
2666    #[test]
2667    fn test_sender_removed_from_tracked_peer_set_drops_message() {
2668        let executor = deterministic::Runner::default();
2669        executor.start(|context| async move {
2670            // Create a simulated network
2671            let (network, mut oracle) = Network::new(
2672                context.with_label("network"),
2673                Config {
2674                    max_size: 1024 * 1024,
2675                    disconnect_on_block: true,
2676                    tracked_peer_sets: Some(1),
2677                },
2678            );
2679            network.start();
2680            let mut manager = oracle.manager();
2681            let mut subscription = manager.subscribe().await;
2682
2683            // Register a peer set
2684            let sender_pk = PrivateKey::from_seed(1).public_key();
2685            let recipient_pk = PrivateKey::from_seed(2).public_key();
2686            manager
2687                .update(
2688                    1,
2689                    vec![sender_pk.clone(), recipient_pk.clone()]
2690                        .try_into()
2691                        .unwrap(),
2692                )
2693                .await;
2694            let (id, _, _) = subscription.next().await.unwrap();
2695            assert_eq!(id, 1);
2696
2697            // Register channels
2698            let (mut sender, _) = oracle
2699                .control(sender_pk.clone())
2700                .register(0, TEST_QUOTA)
2701                .await
2702                .unwrap();
2703            let (_sender2, mut receiver) = oracle
2704                .control(recipient_pk.clone())
2705                .register(0, TEST_QUOTA)
2706                .await
2707                .unwrap();
2708
2709            // Add link
2710            oracle
2711                .add_link(
2712                    sender_pk.clone(),
2713                    recipient_pk.clone(),
2714                    Link {
2715                        latency: Duration::from_millis(1),
2716                        jitter: Duration::ZERO,
2717                        success_rate: 1.0,
2718                    },
2719                )
2720                .await
2721                .unwrap();
2722
2723            // Send and confirm message
2724            let initial_msg = Bytes::from("tracked");
2725            let sent = sender
2726                .send(
2727                    Recipients::One(recipient_pk.clone()),
2728                    initial_msg.clone(),
2729                    false,
2730                )
2731                .await
2732                .unwrap();
2733            assert_eq!(sent.len(), 1);
2734            assert_eq!(sent[0], recipient_pk);
2735            let (_pk, received) = receiver.recv().await.unwrap();
2736            assert_eq!(received, initial_msg);
2737
2738            // Register another peer set
2739            let other_pk = PrivateKey::from_seed(3).public_key();
2740            manager
2741                .update(2, vec![recipient_pk.clone(), other_pk].try_into().unwrap())
2742                .await;
2743            let (id, _, _) = subscription.next().await.unwrap();
2744            assert_eq!(id, 2);
2745
2746            // Send message from untracked peer
2747            let sent = sender
2748                .send(
2749                    Recipients::One(recipient_pk.clone()),
2750                    Bytes::from("untracked"),
2751                    false,
2752                )
2753                .await
2754                .unwrap();
2755            assert!(sent.is_empty());
2756
2757            // Confirm message was not delivered
2758            select! {
2759                _ = receiver.recv() => {
2760                    panic!("unexpected message");
2761                },
2762                _ = context.sleep(Duration::from_secs(10)) => {},
2763            }
2764
2765            // Add a peer back to the tracked set
2766            manager
2767                .update(
2768                    3,
2769                    vec![sender_pk.clone(), recipient_pk.clone()]
2770                        .try_into()
2771                        .unwrap(),
2772                )
2773                .await;
2774            let (id, _, _) = subscription.next().await.unwrap();
2775            assert_eq!(id, 3);
2776
2777            // Send message from tracked peer (now back in a peer set)
2778            let sent = sender
2779                .send(
2780                    Recipients::One(recipient_pk.clone()),
2781                    initial_msg.clone(),
2782                    false,
2783                )
2784                .await
2785                .unwrap();
2786            assert_eq!(sent.len(), 1);
2787            assert_eq!(sent[0], recipient_pk);
2788            let (_pk, received) = receiver.recv().await.unwrap();
2789            assert_eq!(received, initial_msg);
2790        });
2791    }
2792
2793    #[test]
2794    fn test_subscribe_to_peer_sets() {
2795        let executor = deterministic::Runner::default();
2796        executor.start(|context| async move {
2797            let (network, oracle) = Network::new(
2798                context.with_label("network"),
2799                Config {
2800                    max_size: 1024 * 1024,
2801                    disconnect_on_block: true,
2802                    tracked_peer_sets: Some(2),
2803                },
2804            );
2805            network.start();
2806
2807            // Subscribe to peer set updates
2808            let mut manager = oracle.manager();
2809            let mut subscription = manager.subscribe().await;
2810
2811            // Create peers
2812            let pk1 = PrivateKey::from_seed(1).public_key();
2813            let pk2 = PrivateKey::from_seed(2).public_key();
2814            let pk3 = PrivateKey::from_seed(3).public_key();
2815
2816            // Register first peer set
2817            manager
2818                .update(1, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2819                .await;
2820
2821            // Verify we receive the notification
2822            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2823            assert_eq!(peer_set_id, 1);
2824            assert_eq!(peer_set, vec![pk1.clone(), pk2.clone()].try_into().unwrap());
2825            assert_eq!(all, vec![pk1.clone(), pk2.clone()].try_into().unwrap());
2826
2827            // Register second peer set
2828            manager
2829                .update(2, vec![pk2.clone(), pk3.clone()].try_into().unwrap())
2830                .await;
2831
2832            // Verify we receive the notification
2833            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2834            assert_eq!(peer_set_id, 2);
2835            assert_eq!(peer_set, vec![pk2.clone(), pk3.clone()].try_into().unwrap());
2836            assert_eq!(
2837                all,
2838                vec![pk1.clone(), pk2.clone(), pk3.clone()]
2839                    .try_into()
2840                    .unwrap()
2841            );
2842
2843            // Register third peer set
2844            manager
2845                .update(3, vec![pk1.clone(), pk3.clone()].try_into().unwrap())
2846                .await;
2847
2848            // Verify we receive the notification
2849            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2850            assert_eq!(peer_set_id, 3);
2851            assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].try_into().unwrap());
2852            assert_eq!(
2853                all,
2854                vec![pk1.clone(), pk2.clone(), pk3.clone()]
2855                    .try_into()
2856                    .unwrap()
2857            );
2858
2859            // Register fourth peer set
2860            manager
2861                .update(4, vec![pk1.clone(), pk3.clone()].try_into().unwrap())
2862                .await;
2863
2864            // Verify we receive the notification
2865            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2866            assert_eq!(peer_set_id, 4);
2867            assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].try_into().unwrap());
2868            assert_eq!(all, vec![pk1.clone(), pk3.clone()].try_into().unwrap());
2869        });
2870    }
2871
2872    #[test]
2873    fn test_multiple_subscriptions() {
2874        let executor = deterministic::Runner::default();
2875        executor.start(|context| async move {
2876            let (network, oracle) = Network::new(
2877                context.with_label("network"),
2878                Config {
2879                    max_size: 1024 * 1024,
2880                    disconnect_on_block: true,
2881                    tracked_peer_sets: Some(3),
2882                },
2883            );
2884            network.start();
2885
2886            // Create multiple subscriptions
2887            let mut manager = oracle.manager();
2888            let mut subscription1 = manager.subscribe().await;
2889            let mut subscription2 = manager.subscribe().await;
2890            let mut subscription3 = manager.subscribe().await;
2891
2892            // Create peers
2893            let pk1 = PrivateKey::from_seed(1).public_key();
2894            let pk2 = PrivateKey::from_seed(2).public_key();
2895
2896            // Register a peer set
2897            manager
2898                .update(1, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2899                .await;
2900
2901            // Verify all subscriptions receive the notification
2902            let (id1, _, _) = subscription1.next().await.unwrap();
2903            let (id2, _, _) = subscription2.next().await.unwrap();
2904            let (id3, _, _) = subscription3.next().await.unwrap();
2905
2906            assert_eq!(id1, 1);
2907            assert_eq!(id2, 1);
2908            assert_eq!(id3, 1);
2909
2910            // Drop one subscription
2911            drop(subscription2);
2912
2913            // Register another peer set
2914            manager
2915                .update(2, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2916                .await;
2917
2918            // Verify remaining subscriptions still receive notifications
2919            let (id1, _, _) = subscription1.next().await.unwrap();
2920            let (id3, _, _) = subscription3.next().await.unwrap();
2921
2922            assert_eq!(id1, 2);
2923            assert_eq!(id3, 2);
2924        });
2925    }
2926
2927    #[test]
2928    fn test_subscription_includes_self_when_registered() {
2929        let executor = deterministic::Runner::default();
2930        executor.start(|context| async move {
2931            let (network, oracle) = Network::new(
2932                context.with_label("network"),
2933                Config {
2934                    max_size: 1024 * 1024,
2935                    disconnect_on_block: true,
2936                    tracked_peer_sets: Some(2),
2937                },
2938            );
2939            network.start();
2940
2941            // Create "self" and "other" peers
2942            let self_pk = PrivateKey::from_seed(0).public_key();
2943            let other_pk = PrivateKey::from_seed(1).public_key();
2944
2945            // Register a channel for self (this creates the peer in the network)
2946            let (_sender, _receiver) = oracle
2947                .control(self_pk.clone())
2948                .register(0, TEST_QUOTA)
2949                .await
2950                .unwrap();
2951
2952            // Subscribe to peer set updates
2953            let mut manager = oracle.manager();
2954            let mut subscription = manager.subscribe().await;
2955
2956            // Register a peer set that does NOT include self
2957            manager
2958                .update(1, vec![other_pk.clone()].try_into().unwrap())
2959                .await;
2960
2961            // Receive subscription notification
2962            let (id, new, all) = subscription.next().await.unwrap();
2963            assert_eq!(id, 1);
2964            assert_eq!(new.len(), 1);
2965            assert_eq!(all.len(), 1);
2966
2967            // Self should NOT be in the new set
2968            assert!(
2969                new.position(&self_pk).is_none(),
2970                "new set should not include self"
2971            );
2972            assert!(
2973                new.position(&other_pk).is_some(),
2974                "new set should include other"
2975            );
2976
2977            // Self should NOT be in the tracked set (not registered)
2978            assert!(
2979                all.position(&self_pk).is_none(),
2980                "tracked peers should not include self"
2981            );
2982            assert!(
2983                all.position(&other_pk).is_some(),
2984                "tracked peers should include other"
2985            );
2986
2987            // Now register a peer set that DOES include self
2988            manager
2989                .update(
2990                    2,
2991                    vec![self_pk.clone(), other_pk.clone()].try_into().unwrap(),
2992                )
2993                .await;
2994
2995            let (id, new, all) = subscription.next().await.unwrap();
2996            assert_eq!(id, 2);
2997            assert_eq!(new.len(), 2);
2998            assert_eq!(all.len(), 2);
2999
3000            // Both peers should be in the new set
3001            assert!(
3002                new.position(&self_pk).is_some(),
3003                "new set should include self"
3004            );
3005            assert!(
3006                new.position(&other_pk).is_some(),
3007                "new set should include other"
3008            );
3009
3010            // Both peers should be in the tracked set
3011            assert!(
3012                all.position(&self_pk).is_some(),
3013                "tracked peers should include self"
3014            );
3015            assert!(
3016                all.position(&other_pk).is_some(),
3017                "tracked peers should include other"
3018            );
3019        });
3020    }
3021
3022    #[test]
3023    fn test_rate_limiting() {
3024        let executor = deterministic::Runner::default();
3025        executor.start(|context| async move {
3026            let cfg = Config {
3027                max_size: 1024 * 1024,
3028                disconnect_on_block: true,
3029                tracked_peer_sets: Some(3),
3030            };
3031            let network_context = context.with_label("network");
3032            let (network, mut oracle) = Network::new(network_context.clone(), cfg);
3033            network.start();
3034
3035            // Create two public keys
3036            let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3037            let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3038
3039            // Register the peer set
3040            let mut manager = oracle.manager();
3041            manager
3042                .update(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
3043                .await;
3044
3045            // Register with a very restrictive quota: 1 message per second
3046            let restrictive_quota = Quota::per_second(NZU32!(1));
3047            let mut control1 = oracle.control(pk1.clone());
3048            let (mut sender, _) = control1.register(0, restrictive_quota).await.unwrap();
3049            let mut control2 = oracle.control(pk2.clone());
3050            let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
3051
3052            // Add bidirectional links
3053            let link = ingress::Link {
3054                latency: Duration::from_millis(0),
3055                jitter: Duration::from_millis(0),
3056                success_rate: 1.0,
3057            };
3058            oracle
3059                .add_link(pk1.clone(), pk2.clone(), link.clone())
3060                .await
3061                .unwrap();
3062            oracle.add_link(pk2.clone(), pk1, link).await.unwrap();
3063
3064            // First message should succeed immediately
3065            let msg1 = Bytes::from_static(b"message1");
3066            let result1 = sender
3067                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
3068                .await
3069                .unwrap();
3070            assert_eq!(result1.len(), 1, "first message should be sent");
3071
3072            // Verify first message is received
3073            let (_, received1) = receiver.recv().await.unwrap();
3074            assert_eq!(received1, msg1);
3075
3076            // Second message should be rate-limited (quota is 1/sec, no time has passed)
3077            let msg2 = Bytes::from_static(b"message2");
3078            let result2 = sender
3079                .send(Recipients::One(pk2.clone()), msg2.clone(), false)
3080                .await
3081                .unwrap();
3082            assert_eq!(
3083                result2.len(),
3084                0,
3085                "second message should be rate-limited (skipped)"
3086            );
3087
3088            // Advance time by 1 second to allow the rate limiter to reset
3089            context.sleep(Duration::from_secs(1)).await;
3090
3091            // Third message should succeed after waiting
3092            let msg3 = Bytes::from_static(b"message3");
3093            let result3 = sender
3094                .send(Recipients::One(pk2.clone()), msg3.clone(), false)
3095                .await
3096                .unwrap();
3097            assert_eq!(result3.len(), 1, "third message should be sent after wait");
3098
3099            // Verify third message is received
3100            let (_, received3) = receiver.recv().await.unwrap();
3101            assert_eq!(received3, msg3);
3102        });
3103    }
3104}