commonware_p2p/simulated/
mod.rs

1//! Simulate networking between peers with configurable link behavior (i.e. drops, latency, corruption, etc.).
2//!
3//! Both peer and link modification can be performed dynamically over the lifetime of the simulated network. This
4//! can be used to mimic transient network partitions, offline nodes (that later connect), and/or degrading link
5//! quality. Messages on a link are delivered in order, and optional per-peer bandwidth limits account for
6//! transmission delay and queueing.
7//!
8//! # Determinism
9//!
10//! `commonware-p2p::simulated` can be run deterministically when paired with `commonware-runtime::deterministic`.
11//! This makes it possible to reproduce an arbitrary order of delivered/dropped messages with a given seed.
12//!
13//! # Bandwidth Simulation
14//!
15//! The simulator provides a realistic model of bandwidth contention where network
16//! capacity is a shared, finite resource. Bandwidth is allocated via progressive
17//! filling to provide max-min fairness.
18//!
19//! _If no bandwidth constraints are provided (default behavior), progressive filling and bandwidth
20//! tracking are not performed (avoiding unnecessary overhead for minimal p2p testing common in CI)._
21//!
22//! ## Core Model
23//!
24//! Whenever a transfer starts or finishes, or a bandwidth limit is updated, we execute a scheduling tick:
25//!
26//! 1. **Collect Active Flows:** Gather every active transfer that still has
27//!    bytes to send. A flow is bound to one sender and to one receiver (if the message will be delivered).
28//! 2. **Compute Progressive Filling:** Run progressive filling to raise the rate of
29//!    every active flow in lock-step until some sender's egress or receiver's ingress
30//!    limit saturates (at which point the flow is frozen and the process repeats with what remains).
31//! 3. **Wait for the Next Event:** Using those rates, determine which flow will
32//!    finish first by computing how long it needs to transmit its remaining
33//!    bytes. Advance simulated time directly to that completion instant (advancing all other flows
34//!    by the bytes transferred over the interval).
35//! 4. **Deliver Message:** Remove the completed flow and pass the message to the receiver. Repeat from step 1
36//!    until all flows are processed.
37//!
38//! _Messages between the same pair of peers remain strictly ordered. When one
39//! message finishes, the next message on that link may begin sending at
40//! `arrival_time - new_latency` so that its first byte arrives immediately after
41//! the previous one is fully received._
42//!
43//! ## Latency vs. Transmission Delay
44//!
45//! The simulation correctly distinguishes between two key components of message delivery:
46//!
47//! - **Transmission Delay:** The time it takes to send all bytes of a message over
48//!   the link. This is determined by the message size and the available bandwidth
49//!   (e.g., a 10KB message on a 10KB/s link has a 1-second transmission delay).
50//! - **Network Latency:** The time it takes for a byte to travel from the sender
51//!   to the receiver, independent of bandwidth. This is configured via the `Link`
52//!   properties.
53//!
54//! The final delivery time of a message is the sum of when its transmission completes
55//! plus the simulated network latency. This model ensures that large messages correctly
56//! occupy the network link for longer periods, affecting other concurrent transfers,
57//! while still accounting for the physical travel time of the data.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_p2p::{Manager, simulated::{Config, Link, Network}};
63//! use commonware_cryptography::{ed25519, PrivateKey, Signer as _, PublicKey as _, PrivateKeyExt as _};
64//! use commonware_runtime::{deterministic, Spawner, Runner, Metrics};
65//! use std::time::Duration;
66//!
67//! // Generate peers
68//! let peers = vec![
69//!     ed25519::PrivateKey::from_seed(0).public_key(),
70//!     ed25519::PrivateKey::from_seed(1).public_key(),
71//!     ed25519::PrivateKey::from_seed(2).public_key(),
72//!     ed25519::PrivateKey::from_seed(3).public_key(),
73//! ];
74//!
75//! // Configure network
76//! let p2p_cfg = Config {
77//!     max_size: 1024 * 1024, // 1MB
78//!     disconnect_on_block: true,
79//!     tracked_peer_sets: Some(3),
80//! };
81//!
82//! // Start context
83//! let executor = deterministic::Runner::seeded(0);
84//! executor.start(|context| async move {
85//!     // Initialize network
86//!     let (network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
87//!
88//!     // Start network
89//!     let network_handler = network.start();
90//!
91//!     // Register a peer set
92//!     oracle.update(0, peers.clone().into()).await;
93//!
94//!     let (sender1, receiver1) = oracle.control(peers[0].clone()).register(0).await.unwrap();
95//!     let (sender2, receiver2) = oracle.control(peers[1].clone()).register(0).await.unwrap();
96//!
97//!     // Set bandwidth limits
98//!     // peer[0]: 10KB/s egress, unlimited ingress
99//!     // peer[1]: unlimited egress, 5KB/s ingress
100//!     oracle.limit_bandwidth(peers[0].clone(), Some(10_000), None).await.unwrap();
101//!     oracle.limit_bandwidth(peers[1].clone(), None, Some(5_000)).await.unwrap();
102//!
103//!     // Link 2 peers
104//!     oracle.add_link(
105//!         peers[0].clone(),
106//!         peers[1].clone(),
107//!         Link {
108//!             latency: Duration::from_millis(5),
109//!             jitter: Duration::from_millis(2),
110//!             success_rate: 0.75,
111//!         },
112//!     ).await.unwrap();
113//!
114//!     // ... Use sender and receiver ...
115//!
116//!     // Update link
117//!     oracle.remove_link(
118//!         peers[0].clone(),
119//!         peers[1].clone(),
120//!     ).await.unwrap();
121//!     oracle.add_link(
122//!         peers[0].clone(),
123//!         peers[1].clone(),
124//!         Link {
125//!             latency: Duration::from_millis(100),
126//!             jitter: Duration::from_millis(25),
127//!             success_rate: 0.8,
128//!         },
129//!     ).await.unwrap();
130//!
131//!     // ... Use sender and receiver ...
132//!
133//!     // Shutdown network
134//!     network_handler.abort();
135//! });
136//! ```
137
138mod bandwidth;
139mod ingress;
140mod metrics;
141mod network;
142mod transmitter;
143
144use thiserror::Error;
145
146/// Errors that can occur when interacting with the network.
147#[derive(Debug, Error)]
148pub enum Error {
149    #[error("message too large: {0}")]
150    MessageTooLarge(usize),
151    #[error("network closed")]
152    NetworkClosed,
153    #[error("not valid to link self")]
154    LinkingSelf,
155    #[error("link already exists")]
156    LinkExists,
157    #[error("link missing")]
158    LinkMissing,
159    #[error("invalid success rate (must be in [0, 1]): {0}")]
160    InvalidSuccessRate(f64),
161    #[error("channel already registered: {0}")]
162    ChannelAlreadyRegistered(u64),
163    #[error("send_frame failed")]
164    SendFrameFailed,
165    #[error("recv_frame failed")]
166    RecvFrameFailed,
167    #[error("bind failed")]
168    BindFailed,
169    #[error("accept failed")]
170    AcceptFailed,
171    #[error("dial failed")]
172    DialFailed,
173    #[error("peer missing")]
174    PeerMissing,
175}
176
177pub use ingress::{Link, Oracle};
178pub use network::{Config, Network, Receiver, Sender};
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use crate::{Manager, Receiver, Recipients, Sender};
184    use bytes::Bytes;
185    use commonware_cryptography::{
186        ed25519::{self, PrivateKey, PublicKey},
187        PrivateKeyExt as _, Signer as _,
188    };
189    use commonware_macros::select;
190    use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
191    use futures::{channel::mpsc, SinkExt, StreamExt};
192    use rand::Rng;
193    use std::{
194        collections::{BTreeMap, HashMap, HashSet},
195        time::Duration,
196    };
197
198    fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
199        let executor = deterministic::Runner::seeded(seed);
200        executor.start(|context| async move {
201            // Create simulated network
202            let (network, mut oracle) = Network::new(
203                context.with_label("network"),
204                Config {
205                    max_size: 1024 * 1024,
206                    disconnect_on_block: true,
207                    tracked_peer_sets: None,
208                },
209            );
210
211            // Start network
212            network.start();
213
214            // Register agents
215            let mut agents = BTreeMap::new();
216            let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
217            for i in 0..size {
218                let pk = PrivateKey::from_seed(i as u64).public_key();
219                let (sender, mut receiver) = oracle.control(pk.clone()).register(0).await.unwrap();
220                agents.insert(pk, sender);
221                let mut agent_sender = seen_sender.clone();
222                context
223                    .with_label("agent_receiver")
224                    .spawn(move |_| async move {
225                        for _ in 0..size {
226                            receiver.recv().await.unwrap();
227                        }
228                        agent_sender.send(i).await.unwrap();
229
230                        // Exiting early here tests the case where the recipient end of an agent is dropped
231                    });
232            }
233
234            // Randomly link agents
235            let only_inbound = PrivateKey::from_seed(0).public_key();
236            for agent in agents.keys() {
237                if agent == &only_inbound {
238                    // Test that we can gracefully handle missing links
239                    continue;
240                }
241                for other in agents.keys() {
242                    let result = oracle
243                        .add_link(
244                            agent.clone(),
245                            other.clone(),
246                            Link {
247                                latency: Duration::from_millis(5),
248                                jitter: Duration::from_millis(2),
249                                success_rate: 0.75,
250                            },
251                        )
252                        .await;
253                    if agent == other {
254                        assert!(matches!(result, Err(Error::LinkingSelf)));
255                    } else {
256                        assert!(result.is_ok());
257                    }
258                }
259            }
260
261            // Send messages
262            context
263                .with_label("agent_sender")
264                .spawn(|mut context| async move {
265                    // Sort agents for deterministic output
266                    let keys = agents.keys().collect::<Vec<_>>();
267
268                    // Send messages
269                    loop {
270                        let index = context.gen_range(0..keys.len());
271                        let sender = keys[index];
272                        let msg = format!("hello from {sender:?}");
273                        let msg = Bytes::from(msg);
274                        let mut message_sender = agents.get(sender).unwrap().clone();
275                        let sent = message_sender
276                            .send(Recipients::All, msg.clone(), false)
277                            .await
278                            .unwrap();
279                        if sender == &only_inbound {
280                            assert_eq!(sent.len(), 0);
281                        } else {
282                            assert_eq!(sent.len(), keys.len() - 1);
283                        }
284                    }
285                });
286
287            // Wait for all recipients
288            let mut results = Vec::new();
289            for _ in 0..size {
290                results.push(seen_receiver.next().await.unwrap());
291            }
292            (context.auditor().state(), results)
293        })
294    }
295
296    fn compare_outputs(seeds: u64, size: usize) {
297        // Collect outputs
298        let mut outputs = Vec::new();
299        for seed in 0..seeds {
300            outputs.push(simulate_messages(seed, size));
301        }
302
303        // Confirm outputs are deterministic
304        for seed in 0..seeds {
305            let output = simulate_messages(seed, size);
306            assert_eq!(output, outputs[seed as usize]);
307        }
308    }
309
310    #[test]
311    fn test_determinism() {
312        compare_outputs(25, 25);
313    }
314
315    #[test]
316    fn test_message_too_big() {
317        let executor = deterministic::Runner::default();
318        executor.start(|mut context| async move {
319            // Create simulated network
320            let (network, oracle) = Network::new(
321                context.with_label("network"),
322                Config {
323                    max_size: 1024 * 1024,
324                    disconnect_on_block: true,
325                    tracked_peer_sets: None,
326                },
327            );
328
329            // Start network
330            network.start();
331
332            // Register agents
333            let mut agents = HashMap::new();
334            for i in 0..10 {
335                let pk = PrivateKey::from_seed(i as u64).public_key();
336                let (sender, _) = oracle.control(pk.clone()).register(0).await.unwrap();
337                agents.insert(pk, sender);
338            }
339
340            // Send invalid message
341            let keys = agents.keys().collect::<Vec<_>>();
342            let index = context.gen_range(0..keys.len());
343            let sender = keys[index];
344            let mut message_sender = agents.get(sender).unwrap().clone();
345            let mut msg = vec![0u8; 1024 * 1024 + 1];
346            context.fill(&mut msg[..]);
347            let result = message_sender
348                .send(Recipients::All, msg.into(), false)
349                .await
350                .unwrap_err();
351
352            // Confirm error is correct
353            assert!(matches!(result, Error::MessageTooLarge(_)));
354        });
355    }
356
357    #[test]
358    fn test_linking_self() {
359        let executor = deterministic::Runner::default();
360        executor.start(|context| async move {
361            // Create simulated network
362            let (network, mut oracle) = Network::new(
363                context.with_label("network"),
364                Config {
365                    max_size: 1024 * 1024,
366                    disconnect_on_block: true,
367                    tracked_peer_sets: None,
368                },
369            );
370
371            // Start network
372            network.start();
373
374            // Register agents
375            let pk = PrivateKey::from_seed(0).public_key();
376            oracle.control(pk.clone()).register(0).await.unwrap();
377
378            // Attempt to link self
379            let result = oracle
380                .add_link(
381                    pk.clone(),
382                    pk,
383                    Link {
384                        latency: Duration::from_millis(5),
385                        jitter: Duration::from_millis(2),
386                        success_rate: 0.75,
387                    },
388                )
389                .await;
390
391            // Confirm error is correct
392            assert!(matches!(result, Err(Error::LinkingSelf)));
393        });
394    }
395
396    #[test]
397    fn test_duplicate_channel() {
398        let executor = deterministic::Runner::default();
399        executor.start(|context| async move {
400            // Create simulated network
401            let (network, oracle) = Network::new(
402                context.with_label("network"),
403                Config {
404                    max_size: 1024 * 1024,
405                    disconnect_on_block: true,
406                    tracked_peer_sets: None,
407                },
408            );
409
410            // Start network
411            network.start();
412
413            // Register agents
414            let pk = PrivateKey::from_seed(0).public_key();
415            oracle.control(pk.clone()).register(0).await.unwrap();
416            let result = oracle.control(pk.clone()).register(0).await;
417
418            // Confirm error is correct
419            assert!(matches!(result, Err(Error::ChannelAlreadyRegistered(0))));
420        });
421    }
422
423    #[test]
424    fn test_invalid_success_rate() {
425        let executor = deterministic::Runner::default();
426        executor.start(|context| async move {
427            // Create simulated network
428            let (network, mut oracle) = Network::new(
429                context.with_label("network"),
430                Config {
431                    max_size: 1024 * 1024,
432                    disconnect_on_block: true,
433                    tracked_peer_sets: None,
434                },
435            );
436
437            // Start network
438            network.start();
439
440            // Register agents
441            let pk1 = PrivateKey::from_seed(0).public_key();
442            let pk2 = PrivateKey::from_seed(1).public_key();
443            oracle.control(pk1.clone()).register(0).await.unwrap();
444            oracle.control(pk2.clone()).register(0).await.unwrap();
445
446            // Attempt to link with invalid success rate
447            let result = oracle
448                .add_link(
449                    pk1,
450                    pk2,
451                    Link {
452                        latency: Duration::from_millis(5),
453                        jitter: Duration::from_millis(2),
454                        success_rate: 1.5,
455                    },
456                )
457                .await;
458
459            // Confirm error is correct
460            assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
461        });
462    }
463
464    #[test]
465    fn test_simple_message_delivery() {
466        let executor = deterministic::Runner::default();
467        executor.start(|context| async move {
468            // Create simulated network
469            let (network, mut oracle) = Network::new(
470                context.with_label("network"),
471                Config {
472                    max_size: 1024 * 1024,
473                    disconnect_on_block: true,
474                    tracked_peer_sets: None,
475                },
476            );
477
478            // Start network
479            network.start();
480
481            // Register agents
482            let pk1 = PrivateKey::from_seed(0).public_key();
483            let pk2 = PrivateKey::from_seed(1).public_key();
484            let (mut sender1, mut receiver1) =
485                oracle.control(pk1.clone()).register(0).await.unwrap();
486            let (mut sender2, mut receiver2) =
487                oracle.control(pk2.clone()).register(0).await.unwrap();
488
489            // Register unused channels
490            let _ = oracle.control(pk1.clone()).register(1).await.unwrap();
491            let _ = oracle.control(pk2.clone()).register(2).await.unwrap();
492
493            // Link agents
494            oracle
495                .add_link(
496                    pk1.clone(),
497                    pk2.clone(),
498                    Link {
499                        latency: Duration::from_millis(5),
500                        jitter: Duration::from_millis(2),
501                        success_rate: 1.0,
502                    },
503                )
504                .await
505                .unwrap();
506            oracle
507                .add_link(
508                    pk2.clone(),
509                    pk1.clone(),
510                    Link {
511                        latency: Duration::from_millis(5),
512                        jitter: Duration::from_millis(2),
513                        success_rate: 1.0,
514                    },
515                )
516                .await
517                .unwrap();
518
519            // Send messages
520            let msg1 = Bytes::from("hello from pk1");
521            let msg2 = Bytes::from("hello from pk2");
522            sender1
523                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
524                .await
525                .unwrap();
526            sender2
527                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
528                .await
529                .unwrap();
530
531            // Confirm message delivery
532            let (sender, message) = receiver1.recv().await.unwrap();
533            assert_eq!(sender, pk2);
534            assert_eq!(message, msg2);
535            let (sender, message) = receiver2.recv().await.unwrap();
536            assert_eq!(sender, pk1);
537            assert_eq!(message, msg1);
538        });
539    }
540
541    #[test]
542    fn test_send_wrong_channel() {
543        let executor = deterministic::Runner::default();
544        executor.start(|context| async move {
545            // Create simulated network
546            let (network, mut oracle) = Network::new(
547                context.with_label("network"),
548                Config {
549                    max_size: 1024 * 1024,
550                    disconnect_on_block: true,
551                    tracked_peer_sets: None,
552                },
553            );
554
555            // Start network
556            network.start();
557
558            // Register agents
559            let pk1 = PrivateKey::from_seed(0).public_key();
560            let pk2 = PrivateKey::from_seed(1).public_key();
561            let (mut sender1, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
562            let (_, mut receiver2) = oracle.control(pk2.clone()).register(1).await.unwrap();
563
564            // Link agents
565            oracle
566                .add_link(
567                    pk1,
568                    pk2.clone(),
569                    Link {
570                        latency: Duration::from_millis(5),
571                        jitter: Duration::ZERO,
572                        success_rate: 1.0,
573                    },
574                )
575                .await
576                .unwrap();
577
578            // Send message
579            let msg = Bytes::from("hello from pk1");
580            sender1
581                .send(Recipients::One(pk2), msg, false)
582                .await
583                .unwrap();
584
585            // Confirm no message delivery
586            select! {
587                _ = receiver2.recv() => {
588                    panic!("unexpected message");
589                },
590                _ = context.sleep(Duration::from_secs(1)) => {},
591            }
592        });
593    }
594
595    #[test]
596    fn test_dynamic_peers() {
597        let executor = deterministic::Runner::default();
598        executor.start(|context| async move {
599            // Create simulated network
600            let (network, mut oracle) = Network::new(
601                context.with_label("network"),
602                Config {
603                    max_size: 1024 * 1024,
604                    disconnect_on_block: true,
605                    tracked_peer_sets: None,
606                },
607            );
608
609            // Start network
610            network.start();
611
612            // Define agents
613            let pk1 = PrivateKey::from_seed(0).public_key();
614            let pk2 = PrivateKey::from_seed(1).public_key();
615            let (mut sender1, mut receiver1) =
616                oracle.control(pk1.clone()).register(0).await.unwrap();
617            let (mut sender2, mut receiver2) =
618                oracle.control(pk2.clone()).register(0).await.unwrap();
619
620            // Link agents
621            oracle
622                .add_link(
623                    pk1.clone(),
624                    pk2.clone(),
625                    Link {
626                        latency: Duration::from_millis(5),
627                        jitter: Duration::from_millis(2),
628                        success_rate: 1.0,
629                    },
630                )
631                .await
632                .unwrap();
633            oracle
634                .add_link(
635                    pk2.clone(),
636                    pk1.clone(),
637                    Link {
638                        latency: Duration::from_millis(5),
639                        jitter: Duration::from_millis(2),
640                        success_rate: 1.0,
641                    },
642                )
643                .await
644                .unwrap();
645
646            // Send messages
647            let msg1 = Bytes::from("attempt 1: hello from pk1");
648            let msg2 = Bytes::from("attempt 1: hello from pk2");
649            sender1
650                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
651                .await
652                .unwrap();
653            sender2
654                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
655                .await
656                .unwrap();
657
658            // Confirm message delivery
659            let (sender, message) = receiver1.recv().await.unwrap();
660            assert_eq!(sender, pk2);
661            assert_eq!(message, msg2);
662            let (sender, message) = receiver2.recv().await.unwrap();
663            assert_eq!(sender, pk1);
664            assert_eq!(message, msg1);
665        });
666    }
667
668    #[test]
669    fn test_dynamic_links() {
670        let executor = deterministic::Runner::default();
671        executor.start(|context| async move {
672            // Create simulated network
673            let (network, mut oracle) = Network::new(
674                context.with_label("network"),
675                Config {
676                    max_size: 1024 * 1024,
677                    disconnect_on_block: true,
678                    tracked_peer_sets: None,
679                },
680            );
681
682            // Start network
683            network.start();
684
685            // Register agents
686            let pk1 = PrivateKey::from_seed(0).public_key();
687            let pk2 = PrivateKey::from_seed(1).public_key();
688            let (mut sender1, mut receiver1) =
689                oracle.control(pk1.clone()).register(0).await.unwrap();
690            let (mut sender2, mut receiver2) =
691                oracle.control(pk2.clone()).register(0).await.unwrap();
692
693            // Send messages
694            let msg1 = Bytes::from("attempt 1: hello from pk1");
695            let msg2 = Bytes::from("attempt 1: hello from pk2");
696            sender1
697                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
698                .await
699                .unwrap();
700            sender2
701                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
702                .await
703                .unwrap();
704
705            // Confirm no message delivery
706            select! {
707                _ = receiver1.recv() => {
708                    panic!("unexpected message");
709                },
710                _ = receiver2.recv() => {
711                    panic!("unexpected message");
712                },
713                _ = context.sleep(Duration::from_secs(1)) => {},
714            }
715
716            // Link agents
717            oracle
718                .add_link(
719                    pk1.clone(),
720                    pk2.clone(),
721                    Link {
722                        latency: Duration::from_millis(5),
723                        jitter: Duration::from_millis(2),
724                        success_rate: 1.0,
725                    },
726                )
727                .await
728                .unwrap();
729            oracle
730                .add_link(
731                    pk2.clone(),
732                    pk1.clone(),
733                    Link {
734                        latency: Duration::from_millis(5),
735                        jitter: Duration::from_millis(2),
736                        success_rate: 1.0,
737                    },
738                )
739                .await
740                .unwrap();
741
742            // Send messages
743            let msg1 = Bytes::from("attempt 2: hello from pk1");
744            let msg2 = Bytes::from("attempt 2: hello from pk2");
745            sender1
746                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
747                .await
748                .unwrap();
749            sender2
750                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
751                .await
752                .unwrap();
753
754            // Confirm message delivery
755            let (sender, message) = receiver1.recv().await.unwrap();
756            assert_eq!(sender, pk2);
757            assert_eq!(message, msg2);
758            let (sender, message) = receiver2.recv().await.unwrap();
759            assert_eq!(sender, pk1);
760            assert_eq!(message, msg1);
761
762            // Remove links
763            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
764            oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
765
766            // Send messages
767            let msg1 = Bytes::from("attempt 3: hello from pk1");
768            let msg2 = Bytes::from("attempt 3: hello from pk2");
769            sender1
770                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
771                .await
772                .unwrap();
773            sender2
774                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
775                .await
776                .unwrap();
777
778            // Confirm no message delivery
779            select! {
780                _ = receiver1.recv() => {
781                    panic!("unexpected message");
782                },
783                _ = receiver2.recv() => {
784                    panic!("unexpected message");
785                },
786                _ = context.sleep(Duration::from_secs(1)) => {},
787            }
788
789            // Remove non-existent links
790            let result = oracle.remove_link(pk1, pk2).await;
791            assert!(matches!(result, Err(Error::LinkMissing)));
792        });
793    }
794
795    async fn test_bandwidth_between_peers(
796        context: &mut deterministic::Context,
797        oracle: &mut Oracle<PublicKey>,
798        sender_bps: Option<usize>,
799        receiver_bps: Option<usize>,
800        message_size: usize,
801        expected_duration_ms: u64,
802    ) {
803        // Create two agents
804        let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
805        let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
806        let (mut sender, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
807        let (_, mut receiver) = oracle.control(pk2.clone()).register(0).await.unwrap();
808
809        // Set bandwidth limits
810        oracle
811            .limit_bandwidth(pk1.clone(), sender_bps, None)
812            .await
813            .unwrap();
814        oracle
815            .limit_bandwidth(pk2.clone(), None, receiver_bps)
816            .await
817            .unwrap();
818
819        // Link the two agents
820        oracle
821            .add_link(
822                pk1.clone(),
823                pk2.clone(),
824                Link {
825                    // No latency so it doesn't interfere with bandwidth delay calculation
826                    latency: Duration::ZERO,
827                    jitter: Duration::ZERO,
828                    success_rate: 1.0,
829                },
830            )
831            .await
832            .unwrap();
833
834        // Send a message from agent 1 to 2
835        let msg = Bytes::from(vec![42u8; message_size]);
836        let start = context.current();
837        sender
838            .send(Recipients::One(pk2.clone()), msg.clone(), true)
839            .await
840            .unwrap();
841
842        // Measure how long it takes for agent 2 to receive the message
843        let (origin, received) = receiver.recv().await.unwrap();
844        let elapsed = context.current().duration_since(start).unwrap();
845
846        assert_eq!(origin, pk1);
847        assert_eq!(received, msg);
848        assert!(
849            elapsed >= Duration::from_millis(expected_duration_ms),
850            "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
851        );
852        assert!(
853            elapsed < Duration::from_millis(expected_duration_ms + 100),
854            "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
855        );
856    }
857
858    #[test]
859    fn test_bandwidth() {
860        let executor = deterministic::Runner::default();
861        executor.start(|mut context| async move {
862            let (network, mut oracle) = Network::new(
863                context.with_label("network"),
864                Config {
865                    max_size: 1024 * 1024,
866                    disconnect_on_block: true,
867                    tracked_peer_sets: None,
868                },
869            );
870            network.start();
871
872            // Both sender and receiver have the same bandiwdth (1000 B/s)
873            // 500 bytes at 1000 B/s = 0.5 seconds
874            test_bandwidth_between_peers(
875                &mut context,
876                &mut oracle,
877                Some(1000), // sender egress
878                Some(1000), // receiver ingress
879                500,        // message size
880                500,        // expected duration in ms
881            )
882            .await;
883
884            // Sender has lower bandwidth (500 B/s) than receiver (2000 B/s)
885            // Should be limited by sender's 500 B/s
886            // 250 bytes at 500 B/s = 0.5 seconds
887            test_bandwidth_between_peers(
888                &mut context,
889                &mut oracle,
890                Some(500),  // sender egress
891                Some(2000), // receiver ingress
892                250,        // message size
893                500,        // expected duration in ms
894            )
895            .await;
896
897            // Sender has higher bandwidth (2000 B/s) than receiver (500 B/s)
898            // Should be limited by receiver's 500 B/s
899            // 250 bytes at 500 B/s = 0.5 seconds
900            test_bandwidth_between_peers(
901                &mut context,
902                &mut oracle,
903                Some(2000), // sender egress
904                Some(500),  // receiver ingress
905                250,        // message size
906                500,        // expected duration in ms
907            )
908            .await;
909
910            // Unlimited sender, limited receiver
911            // Should be limited by receiver's 1000 B/s
912            // 500 bytes at 1000 B/s = 0.5 seconds
913            test_bandwidth_between_peers(
914                &mut context,
915                &mut oracle,
916                None,       // sender egress (unlimited)
917                Some(1000), // receiver ingress
918                500,        // message size
919                500,        // expected duration in ms
920            )
921            .await;
922
923            // Limited sender, unlimited receiver
924            // Should be limited by sender's 1000 B/s
925            // 500 bytes at 1000 B/s = 0.5 seconds
926            test_bandwidth_between_peers(
927                &mut context,
928                &mut oracle,
929                Some(1000), // sender egress
930                None,       // receiver ingress (unlimited)
931                500,        // message size
932                500,        // expected duration in ms
933            )
934            .await;
935
936            // Unlimited sender, unlimited receiver
937            // Delivery should be (almost) instant
938            test_bandwidth_between_peers(
939                &mut context,
940                &mut oracle,
941                None, // sender egress (unlimited)
942                None, // receiver ingress (unlimited)
943                500,  // message size
944                0,    // expected duration in ms
945            )
946            .await;
947        });
948    }
949
950    #[test]
951    fn test_bandwidth_contention() {
952        // Test bandwidth contention with many peers (one-to-many and many-to-one scenarios)
953        let executor = deterministic::Runner::default();
954        executor.start(|context| async move {
955            let (network, mut oracle) = Network::new(
956                context.with_label("network"),
957                Config {
958                    max_size: 1024 * 1024,
959                    disconnect_on_block: true,
960                    tracked_peer_sets: None,
961                },
962            );
963            network.start();
964
965            // Configuration
966            const NUM_PEERS: usize = 100;
967            const MESSAGE_SIZE: usize = 1000; // 1KB per message
968            const EFFECTIVE_BPS: usize = 10_000; // 10KB/s egress/ingress per peer
969
970            // Create peers
971            let mut peers = Vec::with_capacity(NUM_PEERS + 1);
972            let mut senders = Vec::with_capacity(NUM_PEERS + 1);
973            let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
974
975            // Create the main peer (index 0) and 100 other peers
976            for i in 0..=NUM_PEERS {
977                let pk = PrivateKey::from_seed(i as u64).public_key();
978                let (sender, receiver) = oracle.control(pk.clone()).register(0).await.unwrap();
979                peers.push(pk);
980                senders.push(sender);
981                receivers.push(receiver);
982            }
983
984            // Set bandwidth limits for all peers
985            for pk in &peers {
986                oracle
987                    .limit_bandwidth(pk.clone(), Some(EFFECTIVE_BPS), Some(EFFECTIVE_BPS))
988                    .await
989                    .unwrap();
990            }
991
992            // Link all peers to the main peer (peers[0]) with zero latency
993            for peer in peers.iter().skip(1) {
994                oracle
995                    .add_link(
996                        peer.clone(),
997                        peers[0].clone(),
998                        Link {
999                            latency: Duration::ZERO,
1000                            jitter: Duration::ZERO,
1001                            success_rate: 1.0,
1002                        },
1003                    )
1004                    .await
1005                    .unwrap();
1006                oracle
1007                    .add_link(
1008                        peers[0].clone(),
1009                        peer.clone(),
1010                        Link {
1011                            latency: Duration::ZERO,
1012                            jitter: Duration::ZERO,
1013                            success_rate: 1.0,
1014                        },
1015                    )
1016                    .await
1017                    .unwrap();
1018            }
1019
1020            // One-to-many (main peer sends to all others). Verifies that bandwidth limits
1021            // are properly enforced when sending to multiple recipients
1022            let start = context.current();
1023
1024            // Send message to all peers concurrently
1025            // and wait for all sends to be acknowledged
1026            let msg = Bytes::from(vec![0u8; MESSAGE_SIZE]);
1027            for peer in peers.iter().skip(1) {
1028                senders[0]
1029                    .send(Recipients::One(peer.clone()), msg.clone(), true)
1030                    .await
1031                    .unwrap();
1032            }
1033
1034            // Verify all messages are received
1035            for receiver in receivers.iter_mut().skip(1) {
1036                let (origin, received) = receiver.recv().await.unwrap();
1037                assert_eq!(origin, peers[0]);
1038                assert_eq!(received.len(), MESSAGE_SIZE);
1039            }
1040
1041            let elapsed = context.current().duration_since(start).unwrap();
1042
1043            // Calculate expected time
1044            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1045
1046            assert!(
1047                elapsed >= Duration::from_millis(expected_ms as u64),
1048                "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1049            );
1050            assert!(
1051                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1052                "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1053            );
1054
1055            // Many-to-one (all peers send to the main peer)
1056            let start = context.current();
1057
1058            // Each peer sends a message to the main peer concurrently and we wait for all
1059            // sends to be acknowledged
1060            let msg = Bytes::from(vec![0; MESSAGE_SIZE]);
1061            for mut sender in senders.into_iter().skip(1) {
1062                sender
1063                    .send(Recipients::One(peers[0].clone()), msg.clone(), true)
1064                    .await
1065                    .unwrap();
1066            }
1067
1068            // Collect all messages at the main peer
1069            let mut received_from = HashSet::new();
1070            for _ in 1..=NUM_PEERS {
1071                let (origin, received) = receivers[0].recv().await.unwrap();
1072                assert_eq!(received.len(), MESSAGE_SIZE);
1073                assert!(
1074                    received_from.insert(origin.clone()),
1075                    "Received duplicate from {origin:?}"
1076                );
1077            }
1078
1079            let elapsed = context.current().duration_since(start).unwrap();
1080
1081            // Calculate expected time
1082            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1083
1084            assert!(
1085                elapsed >= Duration::from_millis(expected_ms as u64),
1086                "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1087            );
1088            assert!(
1089                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1090                "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1091            );
1092
1093            // Verify we received from all peers
1094            assert_eq!(received_from.len(), NUM_PEERS);
1095            for peer in peers.iter().skip(1) {
1096                assert!(received_from.contains(peer));
1097            }
1098        });
1099    }
1100
1101    #[test]
1102    fn test_message_ordering() {
1103        // Test that messages arrive in order even with variable latency
1104        let executor = deterministic::Runner::default();
1105        executor.start(|context| async move {
1106            let (network, mut oracle) = Network::new(
1107                context.with_label("network"),
1108                Config {
1109                    max_size: 1024 * 1024,
1110                    disconnect_on_block: true,
1111                    tracked_peer_sets: None,
1112                },
1113            );
1114            network.start();
1115
1116            // Register agents
1117            let pk1 = PrivateKey::from_seed(1).public_key();
1118            let pk2 = PrivateKey::from_seed(2).public_key();
1119            let (mut sender, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
1120            let (_, mut receiver) = oracle.control(pk2.clone()).register(0).await.unwrap();
1121
1122            // Link agents with high jitter to create variable delays
1123            oracle
1124                .add_link(
1125                    pk1.clone(),
1126                    pk2.clone(),
1127                    Link {
1128                        latency: Duration::from_millis(50),
1129                        jitter: Duration::from_millis(40),
1130                        success_rate: 1.0,
1131                    },
1132                )
1133                .await
1134                .unwrap();
1135
1136            // Send multiple messages that should arrive in order
1137            let messages = vec![
1138                Bytes::from("message 1"),
1139                Bytes::from("message 2"),
1140                Bytes::from("message 3"),
1141                Bytes::from("message 4"),
1142                Bytes::from("message 5"),
1143            ];
1144
1145            for msg in messages.clone() {
1146                sender
1147                    .send(Recipients::One(pk2.clone()), msg, true)
1148                    .await
1149                    .unwrap();
1150            }
1151
1152            // Receive messages and verify they arrive in order
1153            for expected_msg in messages {
1154                let (origin, received_msg) = receiver.recv().await.unwrap();
1155                assert_eq!(origin, pk1);
1156                assert_eq!(received_msg, expected_msg);
1157            }
1158        })
1159    }
1160
1161    #[test]
1162    fn test_high_latency_message_blocks_followup() {
1163        let executor = deterministic::Runner::default();
1164        executor.start(|context| async move {
1165            let (network, mut oracle) = Network::new(
1166                context.with_label("network"),
1167                Config {
1168                    max_size: 1024 * 1024,
1169                    disconnect_on_block: true,
1170                    tracked_peer_sets: None,
1171                },
1172            );
1173            network.start();
1174
1175            let pk1 = PrivateKey::from_seed(1).public_key();
1176            let pk2 = PrivateKey::from_seed(2).public_key();
1177            let (mut sender, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
1178            let (_, mut receiver) = oracle.control(pk2.clone()).register(0).await.unwrap();
1179
1180            const BPS: usize = 1_000;
1181            oracle
1182                .limit_bandwidth(pk1.clone(), Some(BPS), None)
1183                .await
1184                .unwrap();
1185            oracle
1186                .limit_bandwidth(pk2.clone(), None, Some(BPS))
1187                .await
1188                .unwrap();
1189
1190            // Send slow message
1191            oracle
1192                .add_link(
1193                    pk1.clone(),
1194                    pk2.clone(),
1195                    Link {
1196                        latency: Duration::from_millis(5_000),
1197                        jitter: Duration::ZERO,
1198                        success_rate: 1.0,
1199                    },
1200                )
1201                .await
1202                .unwrap();
1203
1204            let slow = Bytes::from(vec![0u8; 1_000]);
1205            sender
1206                .send(Recipients::One(pk2.clone()), slow.clone(), true)
1207                .await
1208                .unwrap();
1209
1210            // Update link
1211            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
1212            oracle
1213                .add_link(
1214                    pk1.clone(),
1215                    pk2.clone(),
1216                    Link {
1217                        latency: Duration::from_millis(1),
1218                        jitter: Duration::ZERO,
1219                        success_rate: 1.0,
1220                    },
1221                )
1222                .await
1223                .unwrap();
1224
1225            // Send fast message
1226            let fast = Bytes::from(vec![1u8; 1_000]);
1227            sender
1228                .send(Recipients::One(pk2.clone()), fast.clone(), true)
1229                .await
1230                .unwrap();
1231
1232            let start = context.current();
1233            let (origin1, message1) = receiver.recv().await.unwrap();
1234            assert_eq!(origin1, pk1);
1235            assert_eq!(message1, slow);
1236            let first_elapsed = context.current().duration_since(start).unwrap();
1237
1238            let (origin2, message2) = receiver.recv().await.unwrap();
1239            let second_elapsed = context.current().duration_since(start).unwrap();
1240            assert_eq!(origin2, pk1);
1241            assert_eq!(message2, fast);
1242
1243            let egress_time = Duration::from_secs(1);
1244            let slow_latency = Duration::from_millis(5_000);
1245            let expected_first = egress_time + slow_latency;
1246            let tolerance = Duration::from_millis(10);
1247            assert!(
1248                first_elapsed >= expected_first.saturating_sub(tolerance)
1249                    && first_elapsed <= expected_first + tolerance,
1250                "slow message arrived outside expected window: {first_elapsed:?} (expected {expected_first:?} ± {tolerance:?})"
1251            );
1252            assert!(
1253                second_elapsed >= first_elapsed,
1254                "fast message arrived before slow transmission completed"
1255            );
1256
1257            let arrival_gap = second_elapsed
1258                .checked_sub(first_elapsed)
1259                .expect("timestamps ordered");
1260            assert!(
1261                arrival_gap >= egress_time.saturating_sub(tolerance)
1262                    && arrival_gap <= egress_time + tolerance,
1263                "next arrival deviated from transmit duration (gap = {arrival_gap:?}, expected {egress_time:?} ± {tolerance:?})"
1264            );
1265        })
1266    }
1267
1268    #[test]
1269    fn test_many_to_one_bandwidth_sharing() {
1270        let executor = deterministic::Runner::default();
1271        executor.start(|context| async move {
1272            let (network, mut oracle) = Network::new(
1273                context.with_label("network"),
1274                Config {
1275                    max_size: 1024 * 1024,
1276                    disconnect_on_block: true,
1277                    tracked_peer_sets: None,
1278                },
1279            );
1280            network.start();
1281
1282            // Create 10 senders and 1 receiver
1283            let mut senders = Vec::new();
1284            let mut sender_txs = Vec::new();
1285            for i in 0..10 {
1286                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1287                senders.push(sender.clone());
1288                let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1289                sender_txs.push(tx);
1290
1291                // Each sender has 10KB/s egress
1292                oracle
1293                    .limit_bandwidth(sender.clone(), Some(10_000), None)
1294                    .await
1295                    .unwrap();
1296            }
1297
1298            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1299            let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1300
1301            // Receiver has 100KB/s ingress
1302            oracle
1303                .limit_bandwidth(receiver.clone(), None, Some(100_000))
1304                .await
1305                .unwrap();
1306
1307            // Add links with no latency
1308            for sender in &senders {
1309                oracle
1310                    .add_link(
1311                        sender.clone(),
1312                        receiver.clone(),
1313                        Link {
1314                            latency: Duration::ZERO,
1315                            jitter: Duration::ZERO,
1316                            success_rate: 1.0,
1317                        },
1318                    )
1319                    .await
1320                    .unwrap();
1321            }
1322
1323            let start = context.current();
1324
1325            // All senders send 10KB simultaneously
1326            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1327                let receiver_clone = receiver.clone();
1328                let msg = Bytes::from(vec![i as u8; 10_000]);
1329                tx.send(Recipients::One(receiver_clone), msg, true)
1330                    .await
1331                    .unwrap();
1332            }
1333
1334            // All 10 messages should be received at ~1s
1335            // (100KB total data at 100KB/s aggregate bandwidth)
1336            for i in 0..10 {
1337                let (_, _msg) = receiver_rx.recv().await.unwrap();
1338                let recv_time = context.current().duration_since(start).unwrap();
1339
1340                // Messages should all complete around 1s
1341                assert!(
1342                    recv_time >= Duration::from_millis(950)
1343                        && recv_time <= Duration::from_millis(1100),
1344                    "Message {i} received at {recv_time:?}, expected ~1s",
1345                );
1346            }
1347        });
1348    }
1349
1350    #[test]
1351    fn test_one_to_many_fast_sender() {
1352        // Test that 1 fast sender (100KB/s) sending to 10 receivers (10KB/s each)
1353        // should complete all sends in ~1s and all messages received in ~1s
1354        let executor = deterministic::Runner::default();
1355        executor.start(|context| async move {
1356            let (network, mut oracle) = Network::new(
1357                context.with_label("network"),
1358                Config {
1359                    max_size: 1024 * 1024,
1360                    disconnect_on_block: true,
1361                    tracked_peer_sets: None,
1362                },
1363            );
1364            network.start();
1365
1366            // Create fast sender
1367            let sender = ed25519::PrivateKey::from_seed(0).public_key();
1368            let (sender_tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1369
1370            // Sender has 100KB/s egress
1371            oracle
1372                .limit_bandwidth(sender.clone(), Some(100_000), None)
1373                .await
1374                .unwrap();
1375
1376            // Create 10 receivers
1377            let mut receivers = Vec::new();
1378            let mut receiver_rxs = Vec::new();
1379            for i in 0..10 {
1380                let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1381                receivers.push(receiver.clone());
1382                let (_, rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1383                receiver_rxs.push(rx);
1384
1385                // Each receiver has 10KB/s ingress
1386                oracle
1387                    .limit_bandwidth(receiver.clone(), None, Some(10_000))
1388                    .await
1389                    .unwrap();
1390
1391                // Add link with no latency
1392                oracle
1393                    .add_link(
1394                        sender.clone(),
1395                        receiver.clone(),
1396                        Link {
1397                            latency: Duration::ZERO,
1398                            jitter: Duration::ZERO,
1399                            success_rate: 1.0,
1400                        },
1401                    )
1402                    .await
1403                    .unwrap();
1404            }
1405
1406            let start = context.current();
1407
1408            // Send 10KB to each receiver (100KB total)
1409            for (i, receiver) in receivers.iter().enumerate() {
1410                let mut sender_tx = sender_tx.clone();
1411                let receiver_clone = receiver.clone();
1412                let msg = Bytes::from(vec![i as u8; 10_000]);
1413                sender_tx
1414                    .send(Recipients::One(receiver_clone), msg, true)
1415                    .await
1416                    .unwrap();
1417            }
1418
1419            // Each receiver should receive their 10KB message in ~1s (10KB at 10KB/s)
1420            for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1421                let (_, msg) = rx.recv().await.unwrap();
1422                assert_eq!(msg[0], i as u8);
1423                let recv_time = context.current().duration_since(start).unwrap();
1424
1425                // All messages should be received around 1s
1426                assert!(
1427                    recv_time >= Duration::from_millis(950)
1428                        && recv_time <= Duration::from_millis(1100),
1429                    "Receiver {i} received at {recv_time:?}, expected ~1s",
1430                );
1431            }
1432        });
1433    }
1434
1435    #[test]
1436    fn test_many_slow_senders_to_fast_receiver() {
1437        // Test that 10 slow senders (1KB/s each) sending to a fast receiver (10KB/s)
1438        // should complete all transfers in ~1s
1439        let executor = deterministic::Runner::default();
1440        executor.start(|context| async move {
1441            let (network, mut oracle) = Network::new(
1442                context.with_label("network"),
1443                Config {
1444                    max_size: 1024 * 1024,
1445                    disconnect_on_block: true,
1446                    tracked_peer_sets: None,
1447                },
1448            );
1449            network.start();
1450
1451            // Create 10 slow senders
1452            let mut senders = Vec::new();
1453            let mut sender_txs = Vec::new();
1454            for i in 0..10 {
1455                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1456                senders.push(sender.clone());
1457                let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1458                sender_txs.push(tx);
1459
1460                // Each sender has 1KB/s egress (slow)
1461                oracle
1462                    .limit_bandwidth(sender.clone(), Some(1_000), None)
1463                    .await
1464                    .unwrap();
1465            }
1466
1467            // Create fast receiver
1468            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1469            let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1470
1471            // Receiver has 10KB/s ingress (can handle all 10 senders at full speed)
1472            oracle
1473                .limit_bandwidth(receiver.clone(), None, Some(10_000))
1474                .await
1475                .unwrap();
1476
1477            // Add links with no latency
1478            for sender in &senders {
1479                oracle
1480                    .add_link(
1481                        sender.clone(),
1482                        receiver.clone(),
1483                        Link {
1484                            latency: Duration::ZERO,
1485                            jitter: Duration::ZERO,
1486                            success_rate: 1.0,
1487                        },
1488                    )
1489                    .await
1490                    .unwrap();
1491            }
1492
1493            let start = context.current();
1494
1495            // All senders send 1KB simultaneously
1496            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1497                let receiver_clone = receiver.clone();
1498                let msg = Bytes::from(vec![i as u8; 1_000]);
1499                tx.send(Recipients::One(receiver_clone), msg, true)
1500                    .await
1501                    .unwrap();
1502            }
1503
1504            // Each sender takes 1s to transmit 1KB at 1KB/s
1505            // All transmissions happen in parallel, so total send time is ~1s
1506
1507            // All 10 messages (10KB total) should be received at ~1s
1508            // Receiver processes at 10KB/s, can handle all 10KB in 1s
1509            for i in 0..10 {
1510                let (_, _msg) = receiver_rx.recv().await.unwrap();
1511                let recv_time = context.current().duration_since(start).unwrap();
1512
1513                // All messages should complete around 1s
1514                assert!(
1515                    recv_time >= Duration::from_millis(950)
1516                        && recv_time <= Duration::from_millis(1100),
1517                    "Message {i} received at {recv_time:?}, expected ~1s",
1518                );
1519            }
1520        });
1521    }
1522
1523    #[test]
1524    fn test_dynamic_bandwidth_allocation_staggered() {
1525        // Test that bandwidth is dynamically allocated as
1526        // transfers start and complete at different times
1527        //
1528        // 3 senders to 1 receiver, starting at different times
1529        // Receiver has 30KB/s, senders each have 30KB/s
1530        let executor = deterministic::Runner::default();
1531        executor.start(|context| async move {
1532            let (network, mut oracle) = Network::new(
1533                context.with_label("network"),
1534                Config {
1535                    max_size: 1024 * 1024,
1536                    disconnect_on_block: true,
1537                    tracked_peer_sets: None,
1538                },
1539            );
1540            network.start();
1541
1542            // Create 3 senders
1543            let mut senders = Vec::new();
1544            let mut sender_txs = Vec::new();
1545            for i in 0..3 {
1546                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1547                senders.push(sender.clone());
1548                let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1549                sender_txs.push(tx);
1550
1551                // Each sender has 30KB/s egress
1552                oracle
1553                    .limit_bandwidth(sender.clone(), Some(30_000), None)
1554                    .await
1555                    .unwrap();
1556            }
1557
1558            // Create receiver with 30KB/s ingress
1559            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1560            let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1561            oracle
1562                .limit_bandwidth(receiver.clone(), None, Some(30_000))
1563                .await
1564                .unwrap();
1565
1566            // Add links with minimal latency
1567            for sender in &senders {
1568                oracle
1569                    .add_link(
1570                        sender.clone(),
1571                        receiver.clone(),
1572                        Link {
1573                            latency: Duration::from_millis(1),
1574                            jitter: Duration::ZERO,
1575                            success_rate: 1.0,
1576                        },
1577                    )
1578                    .await
1579                    .unwrap();
1580            }
1581
1582            let start = context.current();
1583
1584            // Sender 0: sends 30KB at t=0
1585            // Gets full 30KB/s for the first 0.5s, then shares with sender 1
1586            // at 15KB/s until completion at t=1.5s
1587            let mut tx0 = sender_txs[0].clone();
1588            let rx_clone = receiver.clone();
1589            context.clone().spawn(move |_| async move {
1590                let msg = Bytes::from(vec![0u8; 30_000]);
1591                tx0.send(Recipients::One(rx_clone), msg, true)
1592                    .await
1593                    .unwrap();
1594            });
1595
1596            // Sender 1: sends 30KB at t=0.5s
1597            // Shares bandwidth with sender 0 (15KB/s each) until t=1.5s,
1598            // then gets the full 30KB/s
1599            let mut tx1 = sender_txs[1].clone();
1600            let rx_clone = receiver.clone();
1601            context.clone().spawn(move |context| async move {
1602                context.sleep(Duration::from_millis(500)).await;
1603                let msg = Bytes::from(vec![1u8; 30_000]);
1604                tx1.send(Recipients::One(rx_clone), msg, true)
1605                    .await
1606                    .unwrap();
1607            });
1608
1609            // Sender 2: sends 15KB at t=1.5s and shares the receiver with
1610            // sender 1, completing at roughly t=2.5s
1611            let mut tx2 = sender_txs[2].clone();
1612            let rx_clone = receiver.clone();
1613            context.clone().spawn(move |context| async move {
1614                context.sleep(Duration::from_millis(1500)).await;
1615                let msg = Bytes::from(vec![2u8; 15_000]);
1616                tx2.send(Recipients::One(rx_clone), msg, true)
1617                    .await
1618                    .unwrap();
1619            });
1620
1621            // Receive and verify timing
1622            // Message 0: starts at t=0, shares bandwidth after 0.5s,
1623            // and completes at t=1.5s (plus link latency)
1624            let (_, msg0) = receiver_rx.recv().await.unwrap();
1625            assert_eq!(msg0[0], 0);
1626            let t0 = context.current().duration_since(start).unwrap();
1627            assert!(
1628                t0 >= Duration::from_millis(1490) && t0 <= Duration::from_millis(1600),
1629                "Message 0 received at {t0:?}, expected ~1.5s",
1630            );
1631
1632            // The algorithm may deliver messages in a different order based on
1633            // efficient bandwidth usage. Let's collect the next two messages and
1634            // verify their timings regardless of order.
1635            let (_, msg_a) = receiver_rx.recv().await.unwrap();
1636            let t_a = context.current().duration_since(start).unwrap();
1637
1638            let (_, msg_b) = receiver_rx.recv().await.unwrap();
1639            let t_b = context.current().duration_since(start).unwrap();
1640
1641            // Figure out which message is which based on content
1642            let (msg1, t1, msg2, t2) = if msg_a[0] == 1 {
1643                (msg_a, t_a, msg_b, t_b)
1644            } else {
1645                (msg_b, t_b, msg_a, t_a)
1646            };
1647
1648            assert_eq!(msg1[0], 1);
1649            assert_eq!(msg2[0], 2);
1650
1651            // Message 1 (30KB) started at t=0.5s
1652            // Message 2 (15KB) started at t=1.5s
1653            // With efficient scheduling, message 2 might complete first since it's smaller
1654            // Both should complete between 1.5s and 2.5s
1655            assert!(
1656                t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1657                "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1658            );
1659
1660            assert!(
1661                t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1662                "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1663            );
1664        });
1665    }
1666
1667    #[test]
1668    fn test_dynamic_bandwidth_varied_sizes() {
1669        // Test dynamic allocation with different message sizes arriving simultaneously
1670        // This tests that smaller messages complete first when bandwidth is shared
1671        let executor = deterministic::Runner::default();
1672        executor.start(|context| async move {
1673            let (network, mut oracle) = Network::new(
1674                context.with_label("network"),
1675                Config {
1676                    max_size: 1024 * 1024,
1677                    disconnect_on_block: true,
1678                    tracked_peer_sets: None,
1679                },
1680            );
1681            network.start();
1682
1683            // Create 3 senders
1684            let mut senders = Vec::new();
1685            let mut sender_txs = Vec::new();
1686            for i in 0..3 {
1687                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1688                senders.push(sender.clone());
1689                let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1690                sender_txs.push(tx);
1691
1692                // Each sender has unlimited egress
1693                oracle
1694                    .limit_bandwidth(sender.clone(), None, None)
1695                    .await
1696                    .unwrap();
1697            }
1698
1699            // Create receiver with 30KB/s ingress
1700            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1701            let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1702            oracle
1703                .limit_bandwidth(receiver.clone(), None, Some(30_000))
1704                .await
1705                .unwrap();
1706
1707            // Add links
1708            for sender in &senders {
1709                oracle
1710                    .add_link(
1711                        sender.clone(),
1712                        receiver.clone(),
1713                        Link {
1714                            latency: Duration::from_millis(1),
1715                            jitter: Duration::ZERO,
1716                            success_rate: 1.0,
1717                        },
1718                    )
1719                    .await
1720                    .unwrap();
1721            }
1722
1723            let start = context.current();
1724
1725            // All start at the same time but with different sizes
1726            //
1727            // The scheduler reserves bandwidth in advance, the actual behavior
1728            // depends on the order tasks are processed. Since all senders
1729            // start at once, they'll compete for bandwidth
1730            let sizes = [10_000, 20_000, 30_000];
1731            for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
1732                let rx_clone = receiver.clone();
1733                let msg_size = *size;
1734                let msg = Bytes::from(vec![i as u8; msg_size]);
1735                tx.send(Recipients::One(rx_clone), msg, true).await.unwrap();
1736            }
1737
1738            // Receive messages. They arrive in the order they were scheduled,
1739            // not necessarily size order. Collect all messages and sort by
1740            // receive time to verify timing
1741            let mut messages = Vec::new();
1742            for _ in 0..3 {
1743                let (_, msg) = receiver_rx.recv().await.unwrap();
1744                let t = context.current().duration_since(start).unwrap();
1745                messages.push((msg[0] as usize, msg.len(), t));
1746            }
1747
1748            // When all start at once, they'll reserve bandwidth slots
1749            // sequentially. First gets full 30KB/s, others wait or get
1750            // remaining bandwidth. Just verify all messages arrived and total
1751            // time is reasonable
1752            assert_eq!(messages.len(), 3);
1753
1754            // Total data is 60KB at 30KB/s receiver ingress = 2s minimum
1755            let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
1756            assert!(
1757                max_time >= Duration::from_millis(2000),
1758                "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
1759            );
1760        });
1761    }
1762
1763    #[test]
1764    fn test_bandwidth_pipe_reservation_duration() {
1765        // Test that bandwidth pipe is only reserved for transmission duration, not latency
1766        // This means new messages can start transmitting while others are still in flight
1767        let executor = deterministic::Runner::default();
1768        executor.start(|context| async move {
1769            let (network, mut oracle) = Network::new(
1770                context.with_label("network"),
1771                Config {
1772                    max_size: 1024 * 1024,
1773                    disconnect_on_block: true,
1774                    tracked_peer_sets: None,
1775                },
1776            );
1777            network.start();
1778
1779            // Create two peers
1780            let sender = PrivateKey::from_seed(1).public_key();
1781            let receiver = PrivateKey::from_seed(2).public_key();
1782
1783            let (sender_tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1784            let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1785
1786            // Set bandwidth: 1000 B/s (1 byte per millisecond)
1787            oracle
1788                .limit_bandwidth(sender.clone(), Some(1000), None)
1789                .await
1790                .unwrap();
1791            oracle
1792                .limit_bandwidth(receiver.clone(), None, Some(1000))
1793                .await
1794                .unwrap();
1795
1796            // Add link with significant latency (1 second)
1797            oracle
1798                .add_link(
1799                    sender.clone(),
1800                    receiver.clone(),
1801                    Link {
1802                        latency: Duration::from_secs(1), // 1 second latency
1803                        jitter: Duration::ZERO,
1804                        success_rate: 1.0,
1805                    },
1806                )
1807                .await
1808                .unwrap();
1809
1810            // Send 3 messages of 500 bytes each
1811            // At 1000 B/s, each message takes 500ms to transmit
1812            // With 1s latency, if pipe was reserved for tx+latency, total would be:
1813            //   - Msg 1: 0-1500ms (500ms tx + 1000ms latency)
1814            //   - Msg 2: 1500-3000ms (starts after msg 1 fully delivered)
1815            //   - Msg 3: 3000-4500ms
1816            // But if pipe is only reserved during tx (correct behavior):
1817            //   - Msg 1: tx 0-500ms, delivered at 1500ms
1818            //   - Msg 2: tx 500-1000ms, delivered at 2000ms
1819            //   - Msg 3: tx 1000-1500ms, delivered at 2500ms
1820            let start = context.current();
1821
1822            // Send all messages in quick succession
1823            for i in 0..3 {
1824                let mut sender_tx = sender_tx.clone();
1825                let receiver = receiver.clone();
1826                let msg = Bytes::from(vec![i; 500]);
1827                sender_tx
1828                    .send(Recipients::One(receiver), msg, false)
1829                    .await
1830                    .unwrap();
1831            }
1832
1833            // Wait for all receives to complete and record their completion times
1834            let mut receive_times = Vec::new();
1835            for i in 0..3 {
1836                let (_, received) = receiver_rx.recv().await.unwrap();
1837                receive_times.push(context.current().duration_since(start).unwrap());
1838                assert_eq!(received[0], i);
1839            }
1840
1841            // Messages should be received at:
1842            // - Msg 1: ~1500ms (500ms transmission + 1000ms latency)
1843            // - Msg 2: ~2000ms (500ms wait + 500ms transmission + 1000ms latency)
1844            // - Msg 3: ~2500ms (1000ms wait + 500ms transmission + 1000ms latency)
1845            for (i, time) in receive_times.iter().enumerate() {
1846                let expected_min = (i as u64 * 500) + 1500;
1847                let expected_max = expected_min + 100;
1848
1849                assert!(
1850                    *time >= Duration::from_millis(expected_min)
1851                        && *time < Duration::from_millis(expected_max),
1852                    "Message {} should arrive at ~{}ms, got {:?}",
1853                    i + 1,
1854                    expected_min,
1855                    time
1856                );
1857            }
1858        });
1859    }
1860
1861    #[test]
1862    fn test_dynamic_bandwidth_affects_new_transfers() {
1863        // This test verifies that bandwidth changes affect NEW transfers,
1864        // not transfers already in progress (which have their reservations locked in)
1865        let executor = deterministic::Runner::default();
1866        executor.start(|context| async move {
1867            let (network, mut oracle) = Network::new(
1868                context.with_label("network"),
1869                Config {
1870                    max_size: 1024 * 1024,
1871                    disconnect_on_block: true,
1872                    tracked_peer_sets: None,
1873                },
1874            );
1875            network.start();
1876
1877            let pk_sender = PrivateKey::from_seed(1).public_key();
1878            let pk_receiver = PrivateKey::from_seed(2).public_key();
1879
1880            // Register peers and establish link
1881            let (mut sender_tx, _) = oracle.control(pk_sender.clone()).register(0).await.unwrap();
1882            let (_, mut receiver_rx) = oracle
1883                .control(pk_receiver.clone())
1884                .register(0)
1885                .await
1886                .unwrap();
1887            oracle
1888                .add_link(
1889                    pk_sender.clone(),
1890                    pk_receiver.clone(),
1891                    Link {
1892                        latency: Duration::from_millis(1), // Small latency
1893                        jitter: Duration::ZERO,
1894                        success_rate: 1.0,
1895                    },
1896                )
1897                .await
1898                .unwrap();
1899
1900            // Initial bandwidth: 10 KB/s
1901            oracle
1902                .limit_bandwidth(pk_sender.clone(), Some(10_000), None)
1903                .await
1904                .unwrap();
1905            oracle
1906                .limit_bandwidth(pk_receiver.clone(), None, Some(10_000))
1907                .await
1908                .unwrap();
1909
1910            // Send first message at 10 KB/s
1911            let msg1 = Bytes::from(vec![1u8; 20_000]); // 20 KB
1912            let start_time = context.current();
1913            sender_tx
1914                .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
1915                .await
1916                .unwrap();
1917
1918            // Receive first message (should take ~2s at 10KB/s)
1919            let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
1920            let msg1_time = context.current().duration_since(start_time).unwrap();
1921            assert_eq!(received_msg1.len(), 20_000);
1922            assert!(
1923                msg1_time >= Duration::from_millis(1999)
1924                    && msg1_time <= Duration::from_millis(2010),
1925                "First message should take ~2s, got {msg1_time:?}",
1926            );
1927
1928            // Change bandwidth to 2 KB/s
1929            oracle
1930                .limit_bandwidth(pk_sender.clone(), Some(2_000), None)
1931                .await
1932                .unwrap();
1933
1934            // Send second message at new bandwidth
1935            let msg2 = Bytes::from(vec![2u8; 10_000]); // 10 KB
1936            let msg2_start = context.current();
1937            sender_tx
1938                .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
1939                .await
1940                .unwrap();
1941
1942            // Receive second message (should take ~5s at 2KB/s)
1943            let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
1944            let msg2_time = context.current().duration_since(msg2_start).unwrap();
1945            assert_eq!(received_msg2.len(), 10_000);
1946            assert!(
1947                msg2_time >= Duration::from_millis(4999)
1948                    && msg2_time <= Duration::from_millis(5010),
1949                "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
1950            );
1951        });
1952    }
1953
1954    #[test]
1955    fn test_zero_receiver_ingress_bandwidth() {
1956        let executor = deterministic::Runner::default();
1957        executor.start(|context| async move {
1958            let (network, mut oracle) = Network::new(
1959                context.with_label("network"),
1960                Config {
1961                    max_size: 1024 * 1024,
1962                    disconnect_on_block: true,
1963                    tracked_peer_sets: None,
1964                },
1965            );
1966            network.start();
1967
1968            let pk_sender = PrivateKey::from_seed(1).public_key();
1969            let pk_receiver = PrivateKey::from_seed(2).public_key();
1970
1971            // Register peers and establish link
1972            let (mut sender_tx, _) = oracle.control(pk_sender.clone()).register(0).await.unwrap();
1973            let (_, mut receiver_rx) = oracle
1974                .control(pk_receiver.clone())
1975                .register(0)
1976                .await
1977                .unwrap();
1978            oracle
1979                .add_link(
1980                    pk_sender.clone(),
1981                    pk_receiver.clone(),
1982                    Link {
1983                        latency: Duration::ZERO,
1984                        jitter: Duration::ZERO,
1985                        success_rate: 1.0,
1986                    },
1987                )
1988                .await
1989                .unwrap();
1990
1991            // Set sender bandwidth to 0
1992            oracle
1993                .limit_bandwidth(pk_receiver.clone(), None, Some(0))
1994                .await
1995                .unwrap();
1996
1997            // Send message to receiver
1998            let msg1 = Bytes::from(vec![1u8; 20_000]); // 20 KB
1999            let sent = sender_tx
2000                .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2001                .await
2002                .unwrap();
2003            assert_eq!(sent.len(), 1);
2004            assert_eq!(sent[0], pk_receiver);
2005
2006            // Message should not be received after 10 seconds
2007            select! {
2008                _ = receiver_rx.recv() => {
2009                    panic!("unexpected message");
2010                },
2011                _ = context.sleep(Duration::from_secs(10)) => {},
2012            }
2013
2014            // Unset bandwidth
2015            oracle
2016                .limit_bandwidth(pk_receiver.clone(), None, None)
2017                .await
2018                .unwrap();
2019
2020            // Message should be immediately received
2021            select! {
2022                _ = receiver_rx.recv() => {},
2023                _ = context.sleep(Duration::from_secs(1)) => {
2024                    panic!("timeout");
2025                },
2026            }
2027        });
2028    }
2029
2030    #[test]
2031    fn test_zero_sender_egress_bandwidth() {
2032        let executor = deterministic::Runner::default();
2033        executor.start(|context| async move {
2034            let (network, mut oracle) = Network::new(
2035                context.with_label("network"),
2036                Config {
2037                    max_size: 1024 * 1024,
2038                    disconnect_on_block: true,
2039                    tracked_peer_sets: None,
2040                },
2041            );
2042            network.start();
2043
2044            let pk_sender = PrivateKey::from_seed(1).public_key();
2045            let pk_receiver = PrivateKey::from_seed(2).public_key();
2046
2047            // Register peers and establish link
2048            let (mut sender_tx, _) = oracle.control(pk_sender.clone()).register(0).await.unwrap();
2049            let (_, mut receiver_rx) = oracle
2050                .control(pk_receiver.clone())
2051                .register(0)
2052                .await
2053                .unwrap();
2054            oracle
2055                .add_link(
2056                    pk_sender.clone(),
2057                    pk_receiver.clone(),
2058                    Link {
2059                        latency: Duration::ZERO,
2060                        jitter: Duration::ZERO,
2061                        success_rate: 1.0,
2062                    },
2063                )
2064                .await
2065                .unwrap();
2066
2067            // Set sender bandwidth to 0
2068            oracle
2069                .limit_bandwidth(pk_sender.clone(), Some(0), None)
2070                .await
2071                .unwrap();
2072
2073            // Send message to receiver
2074            let msg1 = Bytes::from(vec![1u8; 20_000]); // 20 KB
2075            let sent = sender_tx
2076                .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2077                .await
2078                .unwrap();
2079            assert_eq!(sent.len(), 1);
2080            assert_eq!(sent[0], pk_receiver);
2081
2082            // Message should not be received after 10 seconds
2083            select! {
2084                _ = receiver_rx.recv() => {
2085                    panic!("unexpected message");
2086                },
2087                _ = context.sleep(Duration::from_secs(10)) => {},
2088            }
2089
2090            // Unset bandwidth
2091            oracle
2092                .limit_bandwidth(pk_sender.clone(), None, None)
2093                .await
2094                .unwrap();
2095
2096            // Message should be immediately received
2097            select! {
2098                _ = receiver_rx.recv() => {},
2099                _ = context.sleep(Duration::from_secs(1)) => {
2100                    panic!("timeout");
2101                },
2102            }
2103        });
2104    }
2105
2106    #[test]
2107    fn register_peer_set() {
2108        let executor = deterministic::Runner::default();
2109        executor.start(|context| async move {
2110            let (network, mut oracle) = Network::new(
2111                context.with_label("network"),
2112                Config {
2113                    max_size: 1024 * 1024,
2114                    disconnect_on_block: true,
2115                    tracked_peer_sets: Some(3),
2116                },
2117            );
2118            network.start();
2119
2120            assert_eq!(oracle.peer_set(0).await, Some([].into()));
2121
2122            let pk1 = PrivateKey::from_seed(1).public_key();
2123            let pk2 = PrivateKey::from_seed(2).public_key();
2124            oracle.update(0xFF, [pk1.clone(), pk2.clone()].into()).await;
2125
2126            assert_eq!(oracle.peer_set(0xFF).await.unwrap(), [pk1, pk2].into());
2127        });
2128    }
2129
2130    #[test]
2131    fn test_peer_set_window_management() {
2132        let executor = deterministic::Runner::default();
2133        executor.start(|context| async move {
2134            let (network, mut oracle) = Network::new(
2135                context.with_label("network"),
2136                Config {
2137                    max_size: 1024 * 1024,
2138                    disconnect_on_block: true,
2139                    tracked_peer_sets: Some(2), // Only track 2 peer sets
2140                },
2141            );
2142            network.start();
2143
2144            // Create 4 peers
2145            let pk1 = PrivateKey::from_seed(1).public_key();
2146            let pk2 = PrivateKey::from_seed(2).public_key();
2147            let pk3 = PrivateKey::from_seed(3).public_key();
2148            let pk4 = PrivateKey::from_seed(4).public_key();
2149
2150            // Register first peer set with pk1 and pk2
2151            oracle
2152                .update(1, vec![pk1.clone(), pk2.clone()].into())
2153                .await;
2154
2155            // Register channels for all peers
2156            let (mut sender1, _receiver1) = oracle.control(pk1.clone()).register(0).await.unwrap();
2157            let (mut sender2, _receiver2) = oracle.control(pk2.clone()).register(0).await.unwrap();
2158            let (mut sender3, _receiver3) = oracle.control(pk3.clone()).register(0).await.unwrap();
2159            let (_mut_sender4, _receiver4) = oracle.control(pk4.clone()).register(0).await.unwrap();
2160
2161            // Create bidirectional links between all peers
2162            for peer_a in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2163                for peer_b in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2164                    if peer_a != peer_b {
2165                        oracle
2166                            .add_link(
2167                                peer_a.clone(),
2168                                peer_b.clone(),
2169                                Link {
2170                                    latency: Duration::from_millis(1),
2171                                    jitter: Duration::ZERO,
2172                                    success_rate: 1.0,
2173                                },
2174                            )
2175                            .await
2176                            .unwrap();
2177                    }
2178                }
2179            }
2180
2181            // Send message from pk1 to pk2 (both in tracked set) - should succeed
2182            let sent = sender1
2183                .send(Recipients::One(pk2.clone()), Bytes::from("msg1"), false)
2184                .await
2185                .unwrap();
2186            assert_eq!(sent.len(), 1);
2187
2188            // Try to send from pk1 to pk3 (pk3 not in any tracked set) - should fail
2189            let sent = sender1
2190                .send(Recipients::One(pk3.clone()), Bytes::from("msg2"), false)
2191                .await
2192                .unwrap();
2193            assert_eq!(sent.len(), 0);
2194
2195            // Register second peer set with pk2 and pk3
2196            oracle
2197                .update(2, vec![pk2.clone(), pk3.clone()].into())
2198                .await;
2199
2200            // Now pk3 is in a tracked set, message should succeed
2201            let sent = sender1
2202                .send(Recipients::One(pk3.clone()), Bytes::from("msg3"), false)
2203                .await
2204                .unwrap();
2205            assert_eq!(sent.len(), 1);
2206
2207            // Register third peer set with pk3 and pk4 (this will evict peer set 1)
2208            oracle
2209                .update(3, vec![pk3.clone(), pk4.clone()].into())
2210                .await;
2211
2212            // pk1 should now be removed from all tracked sets
2213            // Try to send from pk2 to pk1 - should fail since pk1 is no longer tracked
2214            let sent = sender2
2215                .send(Recipients::One(pk1.clone()), Bytes::from("msg4"), false)
2216                .await
2217                .unwrap();
2218            assert_eq!(sent.len(), 0);
2219
2220            // pk3 should still be reachable (in sets 2 and 3)
2221            let sent = sender2
2222                .send(Recipients::One(pk3.clone()), Bytes::from("msg5"), false)
2223                .await
2224                .unwrap();
2225            assert_eq!(sent.len(), 1);
2226
2227            // pk4 should be reachable (in set 3)
2228            let sent = sender3
2229                .send(Recipients::One(pk4.clone()), Bytes::from("msg6"), false)
2230                .await
2231                .unwrap();
2232            assert_eq!(sent.len(), 1);
2233
2234            // Verify peer set contents
2235            let peer_set_2 = oracle.peer_set(2).await.unwrap();
2236            assert!(peer_set_2.as_ref().contains(&pk2));
2237            assert!(peer_set_2.as_ref().contains(&pk3));
2238
2239            let peer_set_3 = oracle.peer_set(3).await.unwrap();
2240            assert!(peer_set_3.as_ref().contains(&pk3));
2241            assert!(peer_set_3.as_ref().contains(&pk4));
2242
2243            // Peer set 1 should no longer exist
2244            assert!(oracle.peer_set(1).await.is_none());
2245        });
2246    }
2247
2248    #[test]
2249    fn test_subscribe_to_peer_sets() {
2250        let executor = deterministic::Runner::default();
2251        executor.start(|context| async move {
2252            let (network, mut oracle) = Network::new(
2253                context.with_label("network"),
2254                Config {
2255                    max_size: 1024 * 1024,
2256                    disconnect_on_block: true,
2257                    tracked_peer_sets: Some(2),
2258                },
2259            );
2260            network.start();
2261
2262            // Subscribe to peer set updates
2263            let mut subscription = oracle.subscribe().await;
2264
2265            // Create peers
2266            let pk1 = PrivateKey::from_seed(1).public_key();
2267            let pk2 = PrivateKey::from_seed(2).public_key();
2268            let pk3 = PrivateKey::from_seed(3).public_key();
2269
2270            // Register first peer set
2271            oracle
2272                .update(1, vec![pk1.clone(), pk2.clone()].into())
2273                .await;
2274
2275            // Verify we receive the notification
2276            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2277            assert_eq!(peer_set_id, 1);
2278            assert_eq!(peer_set, vec![pk1.clone(), pk2.clone()].into());
2279            assert_eq!(all, vec![pk1.clone(), pk2.clone()].into());
2280
2281            // Register second peer set
2282            oracle
2283                .update(2, vec![pk2.clone(), pk3.clone()].into())
2284                .await;
2285
2286            // Verify we receive the notification
2287            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2288            assert_eq!(peer_set_id, 2);
2289            assert_eq!(peer_set, vec![pk2.clone(), pk3.clone()].into());
2290            assert_eq!(all, vec![pk1.clone(), pk2.clone(), pk3.clone()].into());
2291
2292            // Register third peer set
2293            oracle
2294                .update(3, vec![pk1.clone(), pk3.clone()].into())
2295                .await;
2296
2297            // Verify we receive the notification
2298            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2299            assert_eq!(peer_set_id, 3);
2300            assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].into());
2301            assert_eq!(all, vec![pk1.clone(), pk2.clone(), pk3.clone()].into());
2302
2303            // Register fourth peer set
2304            oracle
2305                .update(4, vec![pk1.clone(), pk3.clone()].into())
2306                .await;
2307
2308            // Verify we receive the notification
2309            let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2310            assert_eq!(peer_set_id, 4);
2311            assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].into());
2312            assert_eq!(all, vec![pk1.clone(), pk3.clone()].into());
2313        });
2314    }
2315
2316    #[test]
2317    fn test_multiple_subscriptions() {
2318        let executor = deterministic::Runner::default();
2319        executor.start(|context| async move {
2320            let (network, mut oracle) = Network::new(
2321                context.with_label("network"),
2322                Config {
2323                    max_size: 1024 * 1024,
2324                    disconnect_on_block: true,
2325                    tracked_peer_sets: Some(3),
2326                },
2327            );
2328            network.start();
2329
2330            // Create multiple subscriptions
2331            let mut subscription1 = oracle.subscribe().await;
2332            let mut subscription2 = oracle.subscribe().await;
2333            let mut subscription3 = oracle.subscribe().await;
2334
2335            // Create peers
2336            let pk1 = PrivateKey::from_seed(1).public_key();
2337            let pk2 = PrivateKey::from_seed(2).public_key();
2338
2339            // Register a peer set
2340            oracle
2341                .update(1, vec![pk1.clone(), pk2.clone()].into())
2342                .await;
2343
2344            // Verify all subscriptions receive the notification
2345            let (id1, _, _) = subscription1.next().await.unwrap();
2346            let (id2, _, _) = subscription2.next().await.unwrap();
2347            let (id3, _, _) = subscription3.next().await.unwrap();
2348
2349            assert_eq!(id1, 1);
2350            assert_eq!(id2, 1);
2351            assert_eq!(id3, 1);
2352
2353            // Drop one subscription
2354            drop(subscription2);
2355
2356            // Register another peer set
2357            oracle
2358                .update(2, vec![pk1.clone(), pk2.clone()].into())
2359                .await;
2360
2361            // Verify remaining subscriptions still receive notifications
2362            let (id1, _, _) = subscription1.next().await.unwrap();
2363            let (id3, _, _) = subscription3.next().await.unwrap();
2364
2365            assert_eq!(id1, 2);
2366            assert_eq!(id3, 2);
2367        });
2368    }
2369}