commonware_p2p/simulated/
mod.rs

1//! Simulate networking between peers with configurable link behavior (i.e. drops, latency, corruption, etc.).
2//!
3//! Both peer and link modification can be performed dynamically over the lifetime of the simulated network. This
4//! can be used to mimic transient network partitions, offline nodes (that later connect), and/or degrading link
5//! quality. Messages on a link are delivered in order, and optional per-peer bandwidth limits account for
6//! transmission delay and queueing.
7//!
8//! # Determinism
9//!
10//! `commonware-p2p::simulated` can be run deterministically when paired with `commonware-runtime::deterministic`.
11//! This makes it possible to reproduce an arbitrary order of delivered/dropped messages with a given seed.
12//!
13//! # Bandwidth Simulation
14//!
15//! The simulator provides a realistic model of bandwidth contention where network
16//! capacity is a shared, finite resource. This is achieved without the overhead
17//! of simulating byte-by-byte transfers.
18//!
19//! ## Core Model
20//!
21//! The bandwidth model is built on an event-based timeline. Instead of simulating
22//! continuous data flow, the scheduler calculates the key points in time where
23//! bandwidth availability changes for a peer. These changes occur whenever a
24//! transfer starts or finishes.
25//!
26//! Each peer has a schedule for its egress (upload) and ingress (download)
27//! bandwidth. When a new message is sent, the scheduler performs the following steps:
28//!
29//! 1. **Find Available Capacity:** It looks at the sender's egress schedule and the
30//!    receiver's ingress schedule to find the available bandwidth over time. The
31//!    effective transfer rate is always limited by the minimum of the two.
32//!
33//! 2. **Fill Time Slots:** The algorithm iterates through time, finding "slots" of
34//!    available bandwidth. It calculates how much of the message can be sent in
35//!    each slot until the entire message is accounted for. For example, if two
36//!    10KB messages are sent over a 10KB/s link, the second message will be
37//!    scheduled to start only after the first one completes.
38//!
39//! 3. **Reserve Bandwidth:** Once the completion time is calculated, the new
40//!    transfer is added to the schedules of both the sender and receiver as a
41//!    bandwidth reservation, consuming capacity for its calculated duration.
42//!
43//! ## Latency vs. Transmission Delay
44//!
45//! The simulation correctly distinguishes between two key components of message delivery:
46//!
47//! - **Transmission Delay:** The time it takes to send all bytes of a message over
48//!   the link. This is determined by the message size and the available bandwidth
49//!   (e.g., a 10KB message on a 10KB/s link has a 1-second transmission delay).
50//! - **Network Latency:** The time it takes for a byte to travel from the sender
51//!   to the receiver, independent of bandwidth. This is configured via the `Link`
52//!   properties.
53//!
54//! The final delivery time of a message is the sum of when its transmission completes
55//! plus the simulated network latency. This model ensures that large messages correctly
56//! occupy the network link for longer periods, affecting other concurrent transfers,
57//! while still accounting for the physical travel time of the data.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_p2p::simulated::{Config, Link, Network};
63//! use commonware_cryptography::{ed25519, PrivateKey, Signer as _, PublicKey as _, PrivateKeyExt as _};
64//! use commonware_runtime::{deterministic, Spawner, Runner, Metrics};
65//! use std::time::Duration;
66//!
67//! // Generate peers
68//! let peers = vec![
69//!     ed25519::PrivateKey::from_seed(0).public_key(),
70//!     ed25519::PrivateKey::from_seed(1).public_key(),
71//!     ed25519::PrivateKey::from_seed(2).public_key(),
72//!     ed25519::PrivateKey::from_seed(3).public_key(),
73//! ];
74//!
75//! // Configure network
76//! let p2p_cfg = Config {
77//!     max_size: 1024 * 1024, // 1MB
78//! };
79//!
80//! // Start context
81//! let executor = deterministic::Runner::seeded(0);
82//! executor.start(|context| async move {
83//!     // Initialize network
84//!     let (network, mut oracle) = Network::new(context.with_label("network"), p2p_cfg);
85//!
86//!     // Start network
87//!     let network_handler = network.start();
88//!
89//!     // Register peers
90//!     let (sender1, receiver1) = oracle.register(peers[0].clone(), 0).await.unwrap();
91//!     let (sender2, receiver2) = oracle.register(peers[1].clone(), 0).await.unwrap();
92//!
93//!     // Set bandwidth limits
94//!     // peer[0]: 10KB/s egress, unlimited ingress
95//!     // peer[1]: unlimited egress, 5KB/s ingress
96//!     oracle.set_bandwidth(peers[0].clone(), 10_000, usize::MAX).await.unwrap();
97//!     oracle.set_bandwidth(peers[1].clone(), usize::MAX, 5_000).await.unwrap();
98//!
99//!     // Link 2 peers
100//!     oracle.add_link(
101//!         peers[0].clone(),
102//!         peers[1].clone(),
103//!         Link {
104//!             latency: Duration::from_millis(5),
105//!             jitter: Duration::from_millis(2),
106//!             success_rate: 0.75,
107//!         },
108//!     ).await.unwrap();
109//!
110//!     // ... Use sender and receiver ...
111//!
112//!     // Update link
113//!     oracle.remove_link(
114//!         peers[0].clone(),
115//!         peers[1].clone(),
116//!     ).await.unwrap();
117//!     oracle.add_link(
118//!         peers[0].clone(),
119//!         peers[1].clone(),
120//!         Link {
121//!             latency: Duration::from_millis(100),
122//!             jitter: Duration::from_millis(25),
123//!             success_rate: 0.8,
124//!         },
125//!     ).await.unwrap();
126//!
127//!     // ... Use sender and receiver ...
128//!
129//!     // Shutdown network
130//!     network_handler.abort();
131//! });
132//! ```
133
134mod bandwidth;
135mod ingress;
136mod metrics;
137mod network;
138
139use thiserror::Error;
140
141/// Errors that can occur when interacting with the network.
142#[derive(Debug, Error)]
143pub enum Error {
144    #[error("message too large: {0}")]
145    MessageTooLarge(usize),
146    #[error("network closed")]
147    NetworkClosed,
148    #[error("not valid to link self")]
149    LinkingSelf,
150    #[error("link already exists")]
151    LinkExists,
152    #[error("link missing")]
153    LinkMissing,
154    #[error("invalid success rate (must be in [0, 1]): {0}")]
155    InvalidSuccessRate(f64),
156    #[error("channel already registered: {0}")]
157    ChannelAlreadyRegistered(u32),
158    #[error("send_frame failed")]
159    SendFrameFailed,
160    #[error("recv_frame failed")]
161    RecvFrameFailed,
162    #[error("bind failed")]
163    BindFailed,
164    #[error("accept failed")]
165    AcceptFailed,
166    #[error("dial failed")]
167    DialFailed,
168    #[error("peer missing")]
169    PeerMissing,
170}
171
172pub use ingress::{Link, Oracle};
173pub use network::{Config, Network, Receiver, Sender};
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use crate::{Receiver, Recipients, Sender};
179    use bytes::Bytes;
180    use commonware_cryptography::{
181        ed25519::{self, PrivateKey, PublicKey},
182        PrivateKeyExt as _, Signer as _,
183    };
184    use commonware_macros::select;
185    use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
186    use futures::{channel::mpsc, future::join_all, SinkExt, StreamExt};
187    use rand::Rng;
188    use std::{
189        collections::{BTreeMap, HashMap, HashSet},
190        time::Duration,
191    };
192
193    fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
194        let executor = deterministic::Runner::seeded(seed);
195        executor.start(|context| async move {
196            // Create simulated network
197            let (network, mut oracle) = Network::new(
198                context.with_label("network"),
199                Config {
200                    max_size: 1024 * 1024,
201                },
202            );
203
204            // Start network
205            network.start();
206
207            // Register agents
208            let mut agents = BTreeMap::new();
209            let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
210            for i in 0..size {
211                let pk = PrivateKey::from_seed(i as u64).public_key();
212                let (sender, mut receiver) = oracle.register(pk.clone(), 0).await.unwrap();
213                agents.insert(pk, sender);
214                let mut agent_sender = seen_sender.clone();
215                context
216                    .with_label("agent_receiver")
217                    .spawn(move |_| async move {
218                        for _ in 0..size {
219                            receiver.recv().await.unwrap();
220                        }
221                        agent_sender.send(i).await.unwrap();
222
223                        // Exiting early here tests the case where the recipient end of an agent is dropped
224                    });
225            }
226
227            // Randomly link agents
228            let only_inbound = PrivateKey::from_seed(0).public_key();
229            for agent in agents.keys() {
230                if agent == &only_inbound {
231                    // Test that we can gracefully handle missing links
232                    continue;
233                }
234                for other in agents.keys() {
235                    let result = oracle
236                        .add_link(
237                            agent.clone(),
238                            other.clone(),
239                            Link {
240                                latency: Duration::from_millis(5),
241                                jitter: Duration::from_millis(2),
242                                success_rate: 0.75,
243                            },
244                        )
245                        .await;
246                    if agent == other {
247                        assert!(matches!(result, Err(Error::LinkingSelf)));
248                    } else {
249                        assert!(result.is_ok());
250                    }
251                }
252            }
253
254            // Send messages
255            context
256                .with_label("agent_sender")
257                .spawn(|mut context| async move {
258                    // Sort agents for deterministic output
259                    let keys = agents.keys().collect::<Vec<_>>();
260
261                    // Send messages
262                    loop {
263                        let index = context.gen_range(0..keys.len());
264                        let sender = keys[index];
265                        let msg = format!("hello from {sender:?}");
266                        let msg = Bytes::from(msg);
267                        let mut message_sender = agents.get(sender).unwrap().clone();
268                        let sent = message_sender
269                            .send(Recipients::All, msg.clone(), false)
270                            .await
271                            .unwrap();
272                        if sender == &only_inbound {
273                            assert_eq!(sent.len(), 0);
274                        } else {
275                            assert_eq!(sent.len(), keys.len() - 1);
276                        }
277                    }
278                });
279
280            // Wait for all recipients
281            let mut results = Vec::new();
282            for _ in 0..size {
283                results.push(seen_receiver.next().await.unwrap());
284            }
285            (context.auditor().state(), results)
286        })
287    }
288
289    fn compare_outputs(seeds: u64, size: usize) {
290        // Collect outputs
291        let mut outputs = Vec::new();
292        for seed in 0..seeds {
293            outputs.push(simulate_messages(seed, size));
294        }
295
296        // Confirm outputs are deterministic
297        for seed in 0..seeds {
298            let output = simulate_messages(seed, size);
299            assert_eq!(output, outputs[seed as usize]);
300        }
301    }
302
303    #[test]
304    fn test_determinism() {
305        compare_outputs(25, 25);
306    }
307
308    #[test]
309    fn test_message_too_big() {
310        let executor = deterministic::Runner::default();
311        executor.start(|mut context| async move {
312            // Create simulated network
313            let (network, mut oracle) = Network::new(
314                context.with_label("network"),
315                Config {
316                    max_size: 1024 * 1024,
317                },
318            );
319
320            // Start network
321            network.start();
322
323            // Register agents
324            let mut agents = HashMap::new();
325            for i in 0..10 {
326                let pk = PrivateKey::from_seed(i as u64).public_key();
327                let (sender, _) = oracle.register(pk.clone(), 0).await.unwrap();
328                agents.insert(pk, sender);
329            }
330
331            // Send invalid message
332            let keys = agents.keys().collect::<Vec<_>>();
333            let index = context.gen_range(0..keys.len());
334            let sender = keys[index];
335            let mut message_sender = agents.get(sender).unwrap().clone();
336            let mut msg = vec![0u8; 1024 * 1024 + 1];
337            context.fill(&mut msg[..]);
338            let result = message_sender
339                .send(Recipients::All, msg.into(), false)
340                .await
341                .unwrap_err();
342
343            // Confirm error is correct
344            assert!(matches!(result, Error::MessageTooLarge(_)));
345        });
346    }
347
348    #[test]
349    fn test_linking_self() {
350        let executor = deterministic::Runner::default();
351        executor.start(|context| async move {
352            // Create simulated network
353            let (network, mut oracle) = Network::new(
354                context.with_label("network"),
355                Config {
356                    max_size: 1024 * 1024,
357                },
358            );
359
360            // Start network
361            network.start();
362
363            // Register agents
364            let pk = PrivateKey::from_seed(0).public_key();
365            oracle.register(pk.clone(), 0).await.unwrap();
366
367            // Attempt to link self
368            let result = oracle
369                .add_link(
370                    pk.clone(),
371                    pk,
372                    Link {
373                        latency: Duration::from_millis(5),
374                        jitter: Duration::from_millis(2),
375                        success_rate: 0.75,
376                    },
377                )
378                .await;
379
380            // Confirm error is correct
381            assert!(matches!(result, Err(Error::LinkingSelf)));
382        });
383    }
384
385    #[test]
386    fn test_duplicate_channel() {
387        let executor = deterministic::Runner::default();
388        executor.start(|context| async move {
389            // Create simulated network
390            let (network, mut oracle) = Network::new(
391                context.with_label("network"),
392                Config {
393                    max_size: 1024 * 1024,
394                },
395            );
396
397            // Start network
398            network.start();
399
400            // Register agents
401            let pk = PrivateKey::from_seed(0).public_key();
402            oracle.register(pk.clone(), 0).await.unwrap();
403            let result = oracle.register(pk, 0).await;
404
405            // Confirm error is correct
406            assert!(matches!(result, Err(Error::ChannelAlreadyRegistered(0))));
407        });
408    }
409
410    #[test]
411    fn test_invalid_success_rate() {
412        let executor = deterministic::Runner::default();
413        executor.start(|context| async move {
414            // Create simulated network
415            let (network, mut oracle) = Network::new(
416                context.with_label("network"),
417                Config {
418                    max_size: 1024 * 1024,
419                },
420            );
421
422            // Start network
423            network.start();
424
425            // Register agents
426            let pk1 = PrivateKey::from_seed(0).public_key();
427            let pk2 = PrivateKey::from_seed(1).public_key();
428            oracle.register(pk1.clone(), 0).await.unwrap();
429            oracle.register(pk2.clone(), 0).await.unwrap();
430
431            // Attempt to link with invalid success rate
432            let result = oracle
433                .add_link(
434                    pk1,
435                    pk2,
436                    Link {
437                        latency: Duration::from_millis(5),
438                        jitter: Duration::from_millis(2),
439                        success_rate: 1.5,
440                    },
441                )
442                .await;
443
444            // Confirm error is correct
445            assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
446        });
447    }
448
449    #[test]
450    fn test_simple_message_delivery() {
451        let executor = deterministic::Runner::default();
452        executor.start(|context| async move {
453            // Create simulated network
454            let (network, mut oracle) = Network::new(
455                context.with_label("network"),
456                Config {
457                    max_size: 1024 * 1024,
458                },
459            );
460
461            // Start network
462            network.start();
463
464            // Register agents
465            let pk1 = PrivateKey::from_seed(0).public_key();
466            let pk2 = PrivateKey::from_seed(1).public_key();
467            let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
468            let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
469
470            // Register unused channels
471            let _ = oracle.register(pk1.clone(), 1).await.unwrap();
472            let _ = oracle.register(pk2.clone(), 2).await.unwrap();
473
474            // Link agents
475            oracle
476                .add_link(
477                    pk1.clone(),
478                    pk2.clone(),
479                    Link {
480                        latency: Duration::from_millis(5),
481                        jitter: Duration::from_millis(2),
482                        success_rate: 1.0,
483                    },
484                )
485                .await
486                .unwrap();
487            oracle
488                .add_link(
489                    pk2.clone(),
490                    pk1.clone(),
491                    Link {
492                        latency: Duration::from_millis(5),
493                        jitter: Duration::from_millis(2),
494                        success_rate: 1.0,
495                    },
496                )
497                .await
498                .unwrap();
499
500            // Send messages
501            let msg1 = Bytes::from("hello from pk1");
502            let msg2 = Bytes::from("hello from pk2");
503            sender1
504                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
505                .await
506                .unwrap();
507            sender2
508                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
509                .await
510                .unwrap();
511
512            // Confirm message delivery
513            let (sender, message) = receiver1.recv().await.unwrap();
514            assert_eq!(sender, pk2);
515            assert_eq!(message, msg2);
516            let (sender, message) = receiver2.recv().await.unwrap();
517            assert_eq!(sender, pk1);
518            assert_eq!(message, msg1);
519        });
520    }
521
522    #[test]
523    fn test_send_wrong_channel() {
524        let executor = deterministic::Runner::default();
525        executor.start(|context| async move {
526            // Create simulated network
527            let (network, mut oracle) = Network::new(
528                context.with_label("network"),
529                Config {
530                    max_size: 1024 * 1024,
531                },
532            );
533
534            // Start network
535            network.start();
536
537            // Register agents
538            let pk1 = PrivateKey::from_seed(0).public_key();
539            let pk2 = PrivateKey::from_seed(1).public_key();
540            let (mut sender1, _) = oracle.register(pk1.clone(), 0).await.unwrap();
541            let (_, mut receiver2) = oracle.register(pk2.clone(), 1).await.unwrap();
542
543            // Link agents
544            oracle
545                .add_link(
546                    pk1,
547                    pk2.clone(),
548                    Link {
549                        latency: Duration::from_millis(5),
550                        jitter: Duration::ZERO,
551                        success_rate: 1.0,
552                    },
553                )
554                .await
555                .unwrap();
556
557            // Send message
558            let msg = Bytes::from("hello from pk1");
559            sender1
560                .send(Recipients::One(pk2), msg, false)
561                .await
562                .unwrap();
563
564            // Confirm no message delivery
565            select! {
566                _ = receiver2.recv() => {
567                    panic!("unexpected message");
568                },
569                _ = context.sleep(Duration::from_secs(1)) => {},
570            }
571        });
572    }
573
574    #[test]
575    fn test_dynamic_peers() {
576        let executor = deterministic::Runner::default();
577        executor.start(|context| async move {
578            // Create simulated network
579            let (network, mut oracle) = Network::new(
580                context.with_label("network"),
581                Config {
582                    max_size: 1024 * 1024,
583                },
584            );
585
586            // Start network
587            network.start();
588
589            // Define agents
590            let pk1 = PrivateKey::from_seed(0).public_key();
591            let pk2 = PrivateKey::from_seed(1).public_key();
592            let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
593            let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
594
595            // Link agents
596            oracle
597                .add_link(
598                    pk1.clone(),
599                    pk2.clone(),
600                    Link {
601                        latency: Duration::from_millis(5),
602                        jitter: Duration::from_millis(2),
603                        success_rate: 1.0,
604                    },
605                )
606                .await
607                .unwrap();
608            oracle
609                .add_link(
610                    pk2.clone(),
611                    pk1.clone(),
612                    Link {
613                        latency: Duration::from_millis(5),
614                        jitter: Duration::from_millis(2),
615                        success_rate: 1.0,
616                    },
617                )
618                .await
619                .unwrap();
620
621            // Send messages
622            let msg1 = Bytes::from("attempt 1: hello from pk1");
623            let msg2 = Bytes::from("attempt 1: hello from pk2");
624            sender1
625                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
626                .await
627                .unwrap();
628            sender2
629                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
630                .await
631                .unwrap();
632
633            // Confirm message delivery
634            let (sender, message) = receiver1.recv().await.unwrap();
635            assert_eq!(sender, pk2);
636            assert_eq!(message, msg2);
637            let (sender, message) = receiver2.recv().await.unwrap();
638            assert_eq!(sender, pk1);
639            assert_eq!(message, msg1);
640        });
641    }
642
643    #[test]
644    fn test_dynamic_links() {
645        let executor = deterministic::Runner::default();
646        executor.start(|context| async move {
647            // Create simulated network
648            let (network, mut oracle) = Network::new(
649                context.with_label("network"),
650                Config {
651                    max_size: 1024 * 1024,
652                },
653            );
654
655            // Start network
656            network.start();
657
658            // Register agents
659            let pk1 = PrivateKey::from_seed(0).public_key();
660            let pk2 = PrivateKey::from_seed(1).public_key();
661            let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
662            let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
663
664            // Send messages
665            let msg1 = Bytes::from("attempt 1: hello from pk1");
666            let msg2 = Bytes::from("attempt 1: hello from pk2");
667            sender1
668                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
669                .await
670                .unwrap();
671            sender2
672                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
673                .await
674                .unwrap();
675
676            // Confirm no message delivery
677            select! {
678                _ = receiver1.recv() => {
679                    panic!("unexpected message");
680                },
681                _ = receiver2.recv() => {
682                    panic!("unexpected message");
683                },
684                _ = context.sleep(Duration::from_secs(1)) => {},
685            }
686
687            // Link agents
688            oracle
689                .add_link(
690                    pk1.clone(),
691                    pk2.clone(),
692                    Link {
693                        latency: Duration::from_millis(5),
694                        jitter: Duration::from_millis(2),
695                        success_rate: 1.0,
696                    },
697                )
698                .await
699                .unwrap();
700            oracle
701                .add_link(
702                    pk2.clone(),
703                    pk1.clone(),
704                    Link {
705                        latency: Duration::from_millis(5),
706                        jitter: Duration::from_millis(2),
707                        success_rate: 1.0,
708                    },
709                )
710                .await
711                .unwrap();
712
713            // Send messages
714            let msg1 = Bytes::from("attempt 2: hello from pk1");
715            let msg2 = Bytes::from("attempt 2: hello from pk2");
716            sender1
717                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
718                .await
719                .unwrap();
720            sender2
721                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
722                .await
723                .unwrap();
724
725            // Confirm message delivery
726            let (sender, message) = receiver1.recv().await.unwrap();
727            assert_eq!(sender, pk2);
728            assert_eq!(message, msg2);
729            let (sender, message) = receiver2.recv().await.unwrap();
730            assert_eq!(sender, pk1);
731            assert_eq!(message, msg1);
732
733            // Remove links
734            oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
735            oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
736
737            // Send messages
738            let msg1 = Bytes::from("attempt 3: hello from pk1");
739            let msg2 = Bytes::from("attempt 3: hello from pk2");
740            sender1
741                .send(Recipients::One(pk2.clone()), msg1.clone(), false)
742                .await
743                .unwrap();
744            sender2
745                .send(Recipients::One(pk1.clone()), msg2.clone(), false)
746                .await
747                .unwrap();
748
749            // Confirm no message delivery
750            select! {
751                _ = receiver1.recv() => {
752                    panic!("unexpected message");
753                },
754                _ = receiver2.recv() => {
755                    panic!("unexpected message");
756                },
757                _ = context.sleep(Duration::from_secs(1)) => {},
758            }
759
760            // Remove non-existent links
761            let result = oracle.remove_link(pk1, pk2).await;
762            assert!(matches!(result, Err(Error::LinkMissing)));
763        });
764    }
765
766    async fn test_bandwidth_between_peers(
767        context: &mut deterministic::Context,
768        oracle: &mut Oracle<PublicKey>,
769        sender_bps: Option<usize>,
770        receiver_bps: Option<usize>,
771        message_size: usize,
772        expected_duration_ms: u64,
773    ) {
774        // Create two agents
775        let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
776        let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
777        let (mut sender, _) = oracle.register(pk1.clone(), 0).await.unwrap();
778        let (_, mut receiver) = oracle.register(pk2.clone(), 0).await.unwrap();
779
780        // Set bandwidth limits
781        oracle
782            .set_bandwidth(pk1.clone(), sender_bps.unwrap_or(usize::MAX), usize::MAX)
783            .await
784            .unwrap();
785        oracle
786            .set_bandwidth(pk2.clone(), usize::MAX, receiver_bps.unwrap_or(usize::MAX))
787            .await
788            .unwrap();
789
790        // Link the two agents
791        oracle
792            .add_link(
793                pk1.clone(),
794                pk2.clone(),
795                Link {
796                    // No latency so it doesn't interfere with bandwidth delay calculation
797                    latency: Duration::ZERO,
798                    jitter: Duration::ZERO,
799                    success_rate: 1.0,
800                },
801            )
802            .await
803            .unwrap();
804
805        // Send a message from agent 1 to 2
806        let msg = Bytes::from(vec![42u8; message_size]);
807        let start = context.current();
808        sender
809            .send(Recipients::One(pk2.clone()), msg.clone(), true)
810            .await
811            .unwrap();
812
813        // Measure how long it takes for agent 2 to receive the message
814        let (origin, received) = receiver.recv().await.unwrap();
815        let elapsed = context.current().duration_since(start).unwrap();
816
817        assert_eq!(origin, pk1);
818        assert_eq!(received, msg);
819        assert!(
820            elapsed >= Duration::from_millis(expected_duration_ms),
821            "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
822        );
823        assert!(
824            elapsed < Duration::from_millis(expected_duration_ms + 100),
825            "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
826        );
827    }
828
829    #[test]
830    fn test_bandwidth() {
831        let executor = deterministic::Runner::default();
832        executor.start(|mut context| async move {
833            let (network, mut oracle) = Network::new(
834                context.with_label("network"),
835                Config {
836                    max_size: 1024 * 1024,
837                },
838            );
839            network.start();
840
841            // Both sender and receiver have the same bandiwdth (1000 B/s)
842            // 500 bytes at 1000 B/s = 0.5 seconds
843            test_bandwidth_between_peers(
844                &mut context,
845                &mut oracle,
846                Some(1000), // sender egress
847                Some(1000), // receiver ingress
848                500,        // message size
849                500,        // expected duration in ms
850            )
851            .await;
852
853            // Sender has lower bandwidth (500 B/s) than receiver (2000 B/s)
854            // Should be limited by sender's 500 B/s
855            // 250 bytes at 500 B/s = 0.5 seconds
856            test_bandwidth_between_peers(
857                &mut context,
858                &mut oracle,
859                Some(500),  // sender egress
860                Some(2000), // receiver ingress
861                250,        // message size
862                500,        // expected duration in ms
863            )
864            .await;
865
866            // Sender has higher bandwidth (2000 B/s) than receiver (500 B/s)
867            // Should be limited by receiver's 500 B/s
868            // 250 bytes at 500 B/s = 0.5 seconds
869            test_bandwidth_between_peers(
870                &mut context,
871                &mut oracle,
872                Some(2000), // sender egress
873                Some(500),  // receiver ingress
874                250,        // message size
875                500,        // expected duration in ms
876            )
877            .await;
878
879            // Unlimited sender, limited receiver
880            // Should be limited by receiver's 1000 B/s
881            // 500 bytes at 1000 B/s = 0.5 seconds
882            test_bandwidth_between_peers(
883                &mut context,
884                &mut oracle,
885                None,       // sender egress (unlimited)
886                Some(1000), // receiver ingress
887                500,        // message size
888                500,        // expected duration in ms
889            )
890            .await;
891
892            // Limited sender, unlimited receiver
893            // Should be limited by sender's 1000 B/s
894            // 500 bytes at 1000 B/s = 0.5 seconds
895            test_bandwidth_between_peers(
896                &mut context,
897                &mut oracle,
898                Some(1000), // sender egress
899                None,       // receiver ingress (unlimited)
900                500,        // message size
901                500,        // expected duration in ms
902            )
903            .await;
904
905            // Unlimited sender, unlimited receiver
906            // Delivery should be (almost) instant
907            test_bandwidth_between_peers(
908                &mut context,
909                &mut oracle,
910                None, // sender egress (unlimited)
911                None, // receiver ingress (unlimited)
912                500,  // message size
913                0,    // expected duration in ms
914            )
915            .await;
916        });
917    }
918
919    #[test]
920    fn test_bandwidth_contention() {
921        // Test bandwidth contention with many peers (one-to-many and many-to-one scenarios)
922        let executor = deterministic::Runner::default();
923        executor.start(|context| async move {
924            let (network, mut oracle) = Network::new(
925                context.with_label("network"),
926                Config {
927                    max_size: 1024 * 1024,
928                },
929            );
930            network.start();
931
932            // Configuration
933            const NUM_PEERS: usize = 100;
934            const MESSAGE_SIZE: usize = 1000; // 1KB per message
935            const EFFECTIVE_BPS: usize = 10_000; // 10KB/s egress/ingress per peer
936
937            // Create peers
938            let mut peers = Vec::with_capacity(NUM_PEERS + 1);
939            let mut senders = Vec::with_capacity(NUM_PEERS + 1);
940            let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
941
942            // Create the main peer (index 0) and 100 other peers
943            for i in 0..=NUM_PEERS {
944                let pk = PrivateKey::from_seed(i as u64).public_key();
945                let (sender, receiver) = oracle.register(pk.clone(), 0).await.unwrap();
946                peers.push(pk);
947                senders.push(sender);
948                receivers.push(receiver);
949            }
950
951            // Set bandwidth limits for all peers
952            for pk in &peers {
953                oracle
954                    .set_bandwidth(pk.clone(), EFFECTIVE_BPS, EFFECTIVE_BPS)
955                    .await
956                    .unwrap();
957            }
958
959            // Link all peers to the main peer (peers[0]) with zero latency
960            for peer in peers.iter().skip(1) {
961                oracle
962                    .add_link(
963                        peer.clone(),
964                        peers[0].clone(),
965                        Link {
966                            latency: Duration::ZERO,
967                            jitter: Duration::ZERO,
968                            success_rate: 1.0,
969                        },
970                    )
971                    .await
972                    .unwrap();
973                oracle
974                    .add_link(
975                        peers[0].clone(),
976                        peer.clone(),
977                        Link {
978                            latency: Duration::ZERO,
979                            jitter: Duration::ZERO,
980                            success_rate: 1.0,
981                        },
982                    )
983                    .await
984                    .unwrap();
985            }
986
987            // One-to-many (main peer sends to all others). Verifies that bandwidth limits
988            // are properly enforced when sending to multiple recipients
989            let start = context.current();
990
991            // Send message to all peers concurrently
992            // and wait for all sends to be acknowledged
993            join_all(peers.iter().skip(1).map(|peer| {
994                let mut sender = senders[0].clone();
995                let recipient = peer.clone();
996                let msg = Bytes::from(vec![0u8; MESSAGE_SIZE]);
997                context.clone().spawn(|_| async move {
998                    sender
999                        .send(Recipients::One(recipient), msg, true)
1000                        .await
1001                        .unwrap()
1002                })
1003            }))
1004            .await;
1005
1006            let elapsed = context.current().duration_since(start).unwrap();
1007
1008            // Calculate expected time
1009            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1010
1011            assert!(
1012                elapsed >= Duration::from_millis(expected_ms as u64),
1013                "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1014            );
1015            assert!(
1016                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1017                "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1018            );
1019
1020            // Verify all messages are received
1021            for receiver in receivers.iter_mut().skip(1) {
1022                let (origin, received) = receiver.recv().await.unwrap();
1023                assert_eq!(origin, peers[0]);
1024                assert_eq!(received.len(), MESSAGE_SIZE);
1025            }
1026
1027            // Many-to-one (all peers send to the main peer)
1028            let start = context.current();
1029
1030            // Each peer sends a message to the main peer concurrently and we wait for all
1031            // sends to be acknowledged
1032            join_all(senders.iter().skip(1).map(|sender| {
1033                let mut sender = sender.clone();
1034                let recipient = peers[0].clone();
1035                let msg = Bytes::from(vec![0; MESSAGE_SIZE]);
1036                context.clone().spawn(|_| async move {
1037                    sender
1038                        .send(Recipients::One(recipient), msg, true)
1039                        .await
1040                        .unwrap()
1041                })
1042            }))
1043            .await;
1044
1045            // Collect all messages at the main peer
1046            let mut received_from = HashSet::new();
1047            for _ in 1..=NUM_PEERS {
1048                let (origin, received) = receivers[0].recv().await.unwrap();
1049                assert_eq!(received.len(), MESSAGE_SIZE);
1050                assert!(
1051                    received_from.insert(origin.clone()),
1052                    "Received duplicate from {origin:?}"
1053                );
1054            }
1055
1056            let elapsed = context.current().duration_since(start).unwrap();
1057
1058            // Calculate expected time
1059            let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1060
1061            assert!(
1062                elapsed >= Duration::from_millis(expected_ms as u64),
1063                "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1064            );
1065            assert!(
1066                elapsed < Duration::from_millis((expected_ms as u64) + 500),
1067                "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1068            );
1069
1070            // Verify we received from all peers
1071            assert_eq!(received_from.len(), NUM_PEERS);
1072            for peer in peers.iter().skip(1) {
1073                assert!(received_from.contains(peer));
1074            }
1075        });
1076    }
1077
1078    #[test]
1079    fn test_message_ordering() {
1080        // Test that messages arrive in order even with variable latency
1081        let executor = deterministic::Runner::default();
1082        executor.start(|context| async move {
1083            let (network, mut oracle) = Network::new(
1084                context.with_label("network"),
1085                Config {
1086                    max_size: 1024 * 1024,
1087                },
1088            );
1089            network.start();
1090
1091            // Register agents
1092            let pk1 = PrivateKey::from_seed(1).public_key();
1093            let pk2 = PrivateKey::from_seed(2).public_key();
1094            let (mut sender, _) = oracle.register(pk1.clone(), 0).await.unwrap();
1095            let (_, mut receiver) = oracle.register(pk2.clone(), 0).await.unwrap();
1096
1097            // Link agents with high jitter to create variable delays
1098            oracle
1099                .add_link(
1100                    pk1.clone(),
1101                    pk2.clone(),
1102                    Link {
1103                        latency: Duration::from_millis(50),
1104                        jitter: Duration::from_millis(40),
1105                        success_rate: 1.0,
1106                    },
1107                )
1108                .await
1109                .unwrap();
1110
1111            // Send multiple messages that should arrive in order
1112            let messages = vec![
1113                Bytes::from("message 1"),
1114                Bytes::from("message 2"),
1115                Bytes::from("message 3"),
1116                Bytes::from("message 4"),
1117                Bytes::from("message 5"),
1118            ];
1119
1120            for msg in messages.clone() {
1121                sender
1122                    .send(Recipients::One(pk2.clone()), msg, true)
1123                    .await
1124                    .unwrap();
1125            }
1126
1127            // Receive messages and verify they arrive in order
1128            for expected_msg in messages {
1129                let (origin, received_msg) = receiver.recv().await.unwrap();
1130                assert_eq!(origin, pk1);
1131                assert_eq!(received_msg, expected_msg);
1132            }
1133        })
1134    }
1135
1136    #[test]
1137    fn test_many_to_one_bandwidth_sharing() {
1138        let executor = deterministic::Runner::default();
1139        executor.start(|context| async move {
1140            let (network, mut oracle) = Network::new(
1141                context.with_label("network"),
1142                Config {
1143                    max_size: 1024 * 1024,
1144                },
1145            );
1146            network.start();
1147
1148            // Create 10 senders and 1 receiver
1149            let mut senders = Vec::new();
1150            let mut sender_txs = Vec::new();
1151            for i in 0..10 {
1152                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1153                senders.push(sender.clone());
1154                let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1155                sender_txs.push(tx);
1156
1157                // Each sender has 10KB/s egress
1158                oracle
1159                    .set_bandwidth(sender.clone(), 10_000, usize::MAX)
1160                    .await
1161                    .unwrap();
1162            }
1163
1164            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1165            let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1166
1167            // Receiver has 100KB/s ingress
1168            oracle
1169                .set_bandwidth(receiver.clone(), usize::MAX, 100_000)
1170                .await
1171                .unwrap();
1172
1173            // Add links with no latency
1174            for sender in &senders {
1175                oracle
1176                    .add_link(
1177                        sender.clone(),
1178                        receiver.clone(),
1179                        Link {
1180                            latency: Duration::ZERO,
1181                            jitter: Duration::ZERO,
1182                            success_rate: 1.0,
1183                        },
1184                    )
1185                    .await
1186                    .unwrap();
1187            }
1188
1189            let start = context.current();
1190
1191            // All senders send 10KB simultaneously
1192            let mut handles = Vec::new();
1193            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1194                let receiver_clone = receiver.clone();
1195                let handle = context.clone().spawn(move |_| async move {
1196                    let msg = Bytes::from(vec![i as u8; 10_000]);
1197                    tx.send(Recipients::One(receiver_clone), msg, true)
1198                        .await
1199                        .unwrap();
1200                });
1201                handles.push(handle);
1202            }
1203
1204            // Wait for all sends to complete
1205            for handle in handles {
1206                handle.await.unwrap();
1207            }
1208
1209            // All 10 messages should be received at ~1s
1210            // (100KB total data at 100KB/s aggregate bandwidth)
1211            for i in 0..10 {
1212                let (_, _msg) = receiver_rx.recv().await.unwrap();
1213                let recv_time = context.current().duration_since(start).unwrap();
1214
1215                // Messages should all complete around 1s
1216                assert!(
1217                    recv_time >= Duration::from_millis(950)
1218                        && recv_time <= Duration::from_millis(1100),
1219                    "Message {i} received at {recv_time:?}, expected ~1s",
1220                );
1221            }
1222        });
1223    }
1224
1225    #[test]
1226    fn test_one_to_many_fast_sender() {
1227        // Test that 1 fast sender (100KB/s) sending to 10 receivers (10KB/s each)
1228        // should complete all sends in ~1s and all messages received in ~1s
1229        let executor = deterministic::Runner::default();
1230        executor.start(|context| async move {
1231            let (network, mut oracle) = Network::new(
1232                context.with_label("network"),
1233                Config {
1234                    max_size: 1024 * 1024,
1235                },
1236            );
1237            network.start();
1238
1239            // Create fast sender
1240            let sender = ed25519::PrivateKey::from_seed(0).public_key();
1241            let (sender_tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1242
1243            // Sender has 100KB/s egress
1244            oracle
1245                .set_bandwidth(sender.clone(), 100_000, usize::MAX)
1246                .await
1247                .unwrap();
1248
1249            // Create 10 receivers
1250            let mut receivers = Vec::new();
1251            let mut receiver_rxs = Vec::new();
1252            for i in 0..10 {
1253                let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1254                receivers.push(receiver.clone());
1255                let (_, rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1256                receiver_rxs.push(rx);
1257
1258                // Each receiver has 10KB/s ingress
1259                oracle
1260                    .set_bandwidth(receiver.clone(), usize::MAX, 10_000)
1261                    .await
1262                    .unwrap();
1263
1264                // Add link with no latency
1265                oracle
1266                    .add_link(
1267                        sender.clone(),
1268                        receiver.clone(),
1269                        Link {
1270                            latency: Duration::ZERO,
1271                            jitter: Duration::ZERO,
1272                            success_rate: 1.0,
1273                        },
1274                    )
1275                    .await
1276                    .unwrap();
1277            }
1278
1279            let start = context.current();
1280
1281            // Send 10KB to each receiver (100KB total)
1282            let mut handles = Vec::new();
1283            for (i, receiver) in receivers.iter().enumerate() {
1284                let mut sender_tx = sender_tx.clone();
1285                let receiver_clone = receiver.clone();
1286                let handle = context.clone().spawn(move |_| async move {
1287                    let msg = Bytes::from(vec![i as u8; 10_000]);
1288                    sender_tx
1289                        .send(Recipients::One(receiver_clone), msg, true)
1290                        .await
1291                        .unwrap();
1292                });
1293                handles.push(handle);
1294            }
1295
1296            // Wait for all sends to complete
1297            for handle in handles {
1298                handle.await.unwrap();
1299            }
1300
1301            let send_time = context.current().duration_since(start).unwrap();
1302
1303            // Sender should send all 100KB in ~1s at 100KB/s
1304            assert!(
1305                send_time >= Duration::from_millis(950) && send_time <= Duration::from_millis(1100),
1306                "Sender took {send_time:?} to send 100KB, expected ~1s",
1307            );
1308
1309            // Each receiver should receive their 10KB message in ~1s (10KB at 10KB/s)
1310            for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1311                let (_, msg) = rx.recv().await.unwrap();
1312                assert_eq!(msg[0], i as u8);
1313                let recv_time = context.current().duration_since(start).unwrap();
1314
1315                // All messages should be received around 1s
1316                assert!(
1317                    recv_time >= Duration::from_millis(950)
1318                        && recv_time <= Duration::from_millis(1100),
1319                    "Receiver {i} received at {recv_time:?}, expected ~1s",
1320                );
1321            }
1322        });
1323    }
1324
1325    #[test]
1326    fn test_many_slow_senders_to_fast_receiver() {
1327        // Test that 10 slow senders (1KB/s each) sending to a fast receiver (10KB/s)
1328        // should complete all transfers in ~1s
1329        let executor = deterministic::Runner::default();
1330        executor.start(|context| async move {
1331            let (network, mut oracle) = Network::new(
1332                context.with_label("network"),
1333                Config {
1334                    max_size: 1024 * 1024,
1335                },
1336            );
1337            network.start();
1338
1339            // Create 10 slow senders
1340            let mut senders = Vec::new();
1341            let mut sender_txs = Vec::new();
1342            for i in 0..10 {
1343                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1344                senders.push(sender.clone());
1345                let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1346                sender_txs.push(tx);
1347
1348                // Each sender has 1KB/s egress (slow)
1349                oracle
1350                    .set_bandwidth(sender.clone(), 1_000, usize::MAX)
1351                    .await
1352                    .unwrap();
1353            }
1354
1355            // Create fast receiver
1356            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1357            let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1358
1359            // Receiver has 10KB/s ingress (can handle all 10 senders at full speed)
1360            oracle
1361                .set_bandwidth(receiver.clone(), usize::MAX, 10_000)
1362                .await
1363                .unwrap();
1364
1365            // Add links with no latency
1366            for sender in &senders {
1367                oracle
1368                    .add_link(
1369                        sender.clone(),
1370                        receiver.clone(),
1371                        Link {
1372                            latency: Duration::ZERO,
1373                            jitter: Duration::ZERO,
1374                            success_rate: 1.0,
1375                        },
1376                    )
1377                    .await
1378                    .unwrap();
1379            }
1380
1381            let start = context.current();
1382
1383            // All senders send 1KB simultaneously
1384            let mut handles = Vec::new();
1385            for (i, mut tx) in sender_txs.into_iter().enumerate() {
1386                let receiver_clone = receiver.clone();
1387                let handle = context.clone().spawn(move |_| async move {
1388                    let msg = Bytes::from(vec![i as u8; 1_000]);
1389                    tx.send(Recipients::One(receiver_clone), msg, true)
1390                        .await
1391                        .unwrap();
1392                });
1393                handles.push(handle);
1394            }
1395
1396            // Wait for all sends to complete
1397            for handle in handles {
1398                handle.await.unwrap();
1399            }
1400
1401            // Each sender takes 1s to transmit 1KB at 1KB/s
1402            // All transmissions happen in parallel, so total send time is ~1s
1403
1404            // All 10 messages (10KB total) should be received at ~1s
1405            // Receiver processes at 10KB/s, can handle all 10KB in 1s
1406            for i in 0..10 {
1407                let (_, _msg) = receiver_rx.recv().await.unwrap();
1408                let recv_time = context.current().duration_since(start).unwrap();
1409
1410                // All messages should complete around 1s
1411                assert!(
1412                    recv_time >= Duration::from_millis(950)
1413                        && recv_time <= Duration::from_millis(1100),
1414                    "Message {i} received at {recv_time:?}, expected ~1s",
1415                );
1416            }
1417        });
1418    }
1419
1420    #[test]
1421    fn test_dynamic_bandwidth_allocation_staggered() {
1422        // Test that bandwidth is dynamically allocated as
1423        // transfers start and complete at different times
1424        //
1425        // 3 senders to 1 receiver, starting at different times
1426        // Receiver has 30KB/s, senders each have 30KB/s
1427        let executor = deterministic::Runner::default();
1428        executor.start(|context| async move {
1429            let (network, mut oracle) = Network::new(
1430                context.with_label("network"),
1431                Config {
1432                    max_size: 1024 * 1024,
1433                },
1434            );
1435            network.start();
1436
1437            // Create 3 senders
1438            let mut senders = Vec::new();
1439            let mut sender_txs = Vec::new();
1440            for i in 0..3 {
1441                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1442                senders.push(sender.clone());
1443                let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1444                sender_txs.push(tx);
1445
1446                // Each sender has 30KB/s egress
1447                oracle
1448                    .set_bandwidth(sender.clone(), 30_000, usize::MAX)
1449                    .await
1450                    .unwrap();
1451            }
1452
1453            // Create receiver with 30KB/s ingress
1454            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1455            let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1456            oracle
1457                .set_bandwidth(receiver.clone(), usize::MAX, 30_000)
1458                .await
1459                .unwrap();
1460
1461            // Add links with minimal latency
1462            for sender in &senders {
1463                oracle
1464                    .add_link(
1465                        sender.clone(),
1466                        receiver.clone(),
1467                        Link {
1468                            latency: Duration::from_millis(1),
1469                            jitter: Duration::ZERO,
1470                            success_rate: 1.0,
1471                        },
1472                    )
1473                    .await
1474                    .unwrap();
1475            }
1476
1477            let start = context.current();
1478
1479            // Sender 0: sends 30KB at t=0
1480            // Should get full 30KB/s bandwidth, completes at t=1s
1481            let mut tx0 = sender_txs[0].clone();
1482            let rx_clone = receiver.clone();
1483            context.clone().spawn(move |_| async move {
1484                let msg = Bytes::from(vec![0u8; 30_000]);
1485                tx0.send(Recipients::One(rx_clone), msg, true)
1486                    .await
1487                    .unwrap();
1488            });
1489
1490            // Sender 1: sends 30KB at t=0.5s
1491            // Should share bandwidth with sender 0 for 0.5s (15KB/s each)
1492            // Then get full bandwidth after sender 0 completes
1493            let mut tx1 = sender_txs[1].clone();
1494            let rx_clone = receiver.clone();
1495            context.clone().spawn(move |context| async move {
1496                context.sleep(Duration::from_millis(500)).await;
1497                let msg = Bytes::from(vec![1u8; 30_000]);
1498                tx1.send(Recipients::One(rx_clone), msg, true)
1499                    .await
1500                    .unwrap();
1501            });
1502
1503            // Sender 2: sends 15KB at t=1.5s
1504            // Should get full bandwidth since others are done
1505            // Completes at t=2s (0.5s transmission)
1506            let mut tx2 = sender_txs[2].clone();
1507            let rx_clone = receiver.clone();
1508            context.clone().spawn(move |context| async move {
1509                context.sleep(Duration::from_millis(1500)).await;
1510                let msg = Bytes::from(vec![2u8; 15_000]);
1511                tx2.send(Recipients::One(rx_clone), msg, true)
1512                    .await
1513                    .unwrap();
1514            });
1515
1516            // Receive and verify timing
1517            // Message 0: starts at t=0, gets full 30KB/s, completes at t=1s
1518            let (_, msg0) = receiver_rx.recv().await.unwrap();
1519            assert_eq!(msg0[0], 0);
1520            let t0 = context.current().duration_since(start).unwrap();
1521            assert!(
1522                t0 >= Duration::from_millis(1000) && t0 <= Duration::from_millis(1100),
1523                "Message 0 received at {t0:?}, expected ~1s",
1524            );
1525
1526            // The algorithm may deliver messages in a different order based on
1527            // efficient bandwidth usage. Let's collect the next two messages and
1528            // verify their timings regardless of order.
1529            let (_, msg_a) = receiver_rx.recv().await.unwrap();
1530            let t_a = context.current().duration_since(start).unwrap();
1531
1532            let (_, msg_b) = receiver_rx.recv().await.unwrap();
1533            let t_b = context.current().duration_since(start).unwrap();
1534
1535            // Figure out which message is which based on content
1536            let (msg1, t1, msg2, t2) = if msg_a[0] == 1 {
1537                (msg_a, t_a, msg_b, t_b)
1538            } else {
1539                (msg_b, t_b, msg_a, t_a)
1540            };
1541
1542            assert_eq!(msg1[0], 1);
1543            assert_eq!(msg2[0], 2);
1544
1545            // Message 1 (30KB) started at t=0.5s
1546            // Message 2 (15KB) started at t=1.5s
1547            // With efficient scheduling, message 2 might complete first since it's smaller
1548            // Both should complete between 1.5s and 2.5s
1549            assert!(
1550                t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1551                "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1552            );
1553
1554            assert!(
1555                t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1556                "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1557            );
1558        });
1559    }
1560
1561    #[test]
1562    fn test_dynamic_bandwidth_varied_sizes() {
1563        // Test dynamic allocation with different message sizes arriving simultaneously
1564        // This tests that smaller messages complete first when bandwidth is shared
1565        let executor = deterministic::Runner::default();
1566        executor.start(|context| async move {
1567            let (network, mut oracle) = Network::new(
1568                context.with_label("network"),
1569                Config {
1570                    max_size: 1024 * 1024,
1571                },
1572            );
1573            network.start();
1574
1575            // Create 3 senders
1576            let mut senders = Vec::new();
1577            let mut sender_txs = Vec::new();
1578            for i in 0..3 {
1579                let sender = ed25519::PrivateKey::from_seed(i).public_key();
1580                senders.push(sender.clone());
1581                let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1582                sender_txs.push(tx);
1583
1584                // Each sender has unlimited egress
1585                oracle
1586                    .set_bandwidth(sender.clone(), usize::MAX, usize::MAX)
1587                    .await
1588                    .unwrap();
1589            }
1590
1591            // Create receiver with 30KB/s ingress
1592            let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1593            let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1594            oracle
1595                .set_bandwidth(receiver.clone(), usize::MAX, 30_000)
1596                .await
1597                .unwrap();
1598
1599            // Add links
1600            for sender in &senders {
1601                oracle
1602                    .add_link(
1603                        sender.clone(),
1604                        receiver.clone(),
1605                        Link {
1606                            latency: Duration::from_millis(1),
1607                            jitter: Duration::ZERO,
1608                            success_rate: 1.0,
1609                        },
1610                    )
1611                    .await
1612                    .unwrap();
1613            }
1614
1615            let start = context.current();
1616
1617            // All start at the same time but with different sizes
1618            //
1619            // The scheduler reserves bandwidth in advance, the actual behavior
1620            // depends on the order tasks are processed. Since all senders
1621            // start at once, they'll compete for bandwidth
1622
1623            let sizes = [10_000, 20_000, 30_000];
1624            let mut handles = Vec::new();
1625
1626            for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
1627                let rx_clone = receiver.clone();
1628                let msg_size = *size;
1629                let handle = context.clone().spawn(move |_| async move {
1630                    let msg = Bytes::from(vec![i as u8; msg_size]);
1631                    tx.send(Recipients::One(rx_clone), msg, true).await.unwrap();
1632                });
1633                handles.push(handle);
1634            }
1635
1636            // Wait for all sends to complete
1637            for handle in handles {
1638                handle.await.unwrap();
1639            }
1640
1641            // Receive messages. They arrive in the order they were scheduled,
1642            // not necessarily size order. Collect all messages and sort by
1643            // receive time to verify timing
1644            let mut messages = Vec::new();
1645            for _ in 0..3 {
1646                let (_, msg) = receiver_rx.recv().await.unwrap();
1647                let t = context.current().duration_since(start).unwrap();
1648                messages.push((msg[0] as usize, msg.len(), t));
1649            }
1650
1651            // When all start at once, they'll reserve bandwidth slots
1652            // sequentially. First gets full 30KB/s, others wait or get
1653            // remaining bandwidth. Just verify all messages arrived and total
1654            // time is reasonable
1655            assert_eq!(messages.len(), 3);
1656
1657            // Total data is 60KB at 30KB/s receiver ingress = 2s minimum
1658            let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
1659            assert!(
1660                max_time >= Duration::from_millis(2000),
1661                "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
1662            );
1663        });
1664    }
1665
1666    #[test]
1667    fn test_bandwidth_pipe_reservation_duration() {
1668        // Test that bandwidth pipe is only reserved for transmission duration, not latency
1669        // This means new messages can start transmitting while others are still in flight
1670        let executor = deterministic::Runner::default();
1671        executor.start(|context| async move {
1672            let (network, mut oracle) = Network::new(
1673                context.with_label("network"),
1674                Config {
1675                    max_size: 1024 * 1024,
1676                },
1677            );
1678            network.start();
1679
1680            // Create two peers
1681            let sender = PrivateKey::from_seed(1).public_key();
1682            let receiver = PrivateKey::from_seed(2).public_key();
1683
1684            let (sender_tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1685            let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1686
1687            // Set bandwidth: 1000 B/s (1 byte per millisecond)
1688            oracle
1689                .set_bandwidth(sender.clone(), 1000, usize::MAX)
1690                .await
1691                .unwrap();
1692            oracle
1693                .set_bandwidth(receiver.clone(), usize::MAX, 1000)
1694                .await
1695                .unwrap();
1696
1697            // Add link with significant latency (1 second)
1698            oracle
1699                .add_link(
1700                    sender.clone(),
1701                    receiver.clone(),
1702                    Link {
1703                        latency: Duration::from_secs(1), // 1 second latency
1704                        jitter: Duration::ZERO,
1705                        success_rate: 1.0,
1706                    },
1707                )
1708                .await
1709                .unwrap();
1710
1711            // Send 3 messages of 500 bytes each
1712            // At 1000 B/s, each message takes 500ms to transmit
1713            // With 1s latency, if pipe was reserved for tx+latency, total would be:
1714            //   - Msg 1: 0-1500ms (500ms tx + 1000ms latency)
1715            //   - Msg 2: 1500-3000ms (starts after msg 1 fully delivered)
1716            //   - Msg 3: 3000-4500ms
1717            // But if pipe is only reserved during tx (correct behavior):
1718            //   - Msg 1: tx 0-500ms, delivered at 1500ms
1719            //   - Msg 2: tx 500-1000ms, delivered at 2000ms
1720            //   - Msg 3: tx 1000-1500ms, delivered at 2500ms
1721
1722            let start = context.current();
1723
1724            // Send all messages in quick succession
1725            let mut handles = Vec::new();
1726            for i in 0..3 {
1727                let mut sender_tx = sender_tx.clone();
1728                let receiver = receiver.clone();
1729                let msg = Bytes::from(vec![i; 500]);
1730                let handle = context.clone().spawn(move |context| async move {
1731                    sender_tx
1732                        .send(Recipients::One(receiver), msg, false)
1733                        .await
1734                        .unwrap();
1735
1736                    // Record time when send completes (is acknowledged)
1737                    context.current().duration_since(start).unwrap()
1738                });
1739                handles.push(handle);
1740
1741                // Small delay between spawns to ensure ordering
1742                context.sleep(Duration::from_millis(1)).await;
1743            }
1744
1745            // Wait for all sends to complete and collect their completion times
1746            let mut send_times = Vec::new();
1747            for handle in handles {
1748                let time = handle.await.unwrap();
1749                send_times.push(time);
1750            }
1751
1752            // Verify that sends completed (were acknowledged) once transmission finished,
1753            // not after delivery. The sends should complete at ~500ms, ~1000ms, ~1500ms
1754            for (i, time) in send_times.iter().enumerate() {
1755                // Each message takes 500ms to transmit, and they queue sequentially
1756                let expected_min = i as u64 * 500;
1757                let expected_max = expected_min + 600;
1758
1759                assert!(
1760                    *time >= Duration::from_millis(expected_min)
1761                        && *time <= Duration::from_millis(expected_max),
1762                    "Send {} should be acknowledged at ~{}ms-{}ms, got {:?}",
1763                    i + 1,
1764                    expected_min,
1765                    expected_max,
1766                    time
1767                );
1768            }
1769
1770            // Wait for all receives to complete and record their completion times
1771            let mut receive_times = Vec::new();
1772            for i in 0..3 {
1773                let (_, received) = receiver_rx.recv().await.unwrap();
1774                receive_times.push(context.current().duration_since(start).unwrap());
1775                assert_eq!(received[0], i);
1776            }
1777
1778            // Messages should be received at:
1779            // - Msg 1: ~1500ms (500ms transmission + 1000ms latency)
1780            // - Msg 2: ~2000ms (500ms wait + 500ms transmission + 1000ms latency)
1781            // - Msg 3: ~2500ms (1000ms wait + 500ms transmission + 1000ms latency)
1782            for (i, time) in receive_times.iter().enumerate() {
1783                let expected_min = (i as u64 * 500) + 1500;
1784                let expected_max = expected_min + 100;
1785
1786                assert!(
1787                    *time >= Duration::from_millis(expected_min)
1788                        && *time < Duration::from_millis(expected_max),
1789                    "Message {} should arrive at ~{}ms, got {:?}",
1790                    i + 1,
1791                    expected_min,
1792                    time
1793                );
1794            }
1795        });
1796    }
1797
1798    #[test]
1799    fn test_dynamic_bandwidth_affects_new_transfers() {
1800        // This test verifies that bandwidth changes affect NEW transfers,
1801        // not transfers already in progress (which have their reservations locked in)
1802        let executor = deterministic::Runner::default();
1803        executor.start(|context| async move {
1804            let (network, mut oracle) = Network::new(
1805                context.with_label("network"),
1806                Config {
1807                    max_size: 1024 * 1024,
1808                },
1809            );
1810            network.start();
1811
1812            let pk_sender = PrivateKey::from_seed(1).public_key();
1813            let pk_receiver = PrivateKey::from_seed(2).public_key();
1814
1815            // Register peers and establish link
1816            let (mut sender_tx, _) = oracle.register(pk_sender.clone(), 0).await.unwrap();
1817            let (_, mut receiver_rx) = oracle.register(pk_receiver.clone(), 0).await.unwrap();
1818            oracle
1819                .add_link(
1820                    pk_sender.clone(),
1821                    pk_receiver.clone(),
1822                    Link {
1823                        latency: Duration::from_millis(1), // Small latency
1824                        jitter: Duration::ZERO,
1825                        success_rate: 1.0,
1826                    },
1827                )
1828                .await
1829                .unwrap();
1830
1831            // Initial bandwidth: 10 KB/s
1832            oracle
1833                .set_bandwidth(pk_sender.clone(), 10_000, usize::MAX)
1834                .await
1835                .unwrap();
1836            oracle
1837                .set_bandwidth(pk_receiver.clone(), usize::MAX, 10_000)
1838                .await
1839                .unwrap();
1840
1841            // Send first message at 10 KB/s
1842            let msg1 = Bytes::from(vec![1u8; 20_000]); // 20 KB
1843            let start_time = context.current();
1844            sender_tx
1845                .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
1846                .await
1847                .unwrap();
1848
1849            // Receive first message (should take ~2s at 10KB/s)
1850            let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
1851            let msg1_time = context.current().duration_since(start_time).unwrap();
1852            assert_eq!(received_msg1.len(), 20_000);
1853            assert!(
1854                msg1_time >= Duration::from_millis(1999)
1855                    && msg1_time <= Duration::from_millis(2010),
1856                "First message should take ~2s, got {msg1_time:?}",
1857            );
1858
1859            // Change bandwidth to 2 KB/s
1860            oracle
1861                .set_bandwidth(pk_sender.clone(), 2_000, usize::MAX)
1862                .await
1863                .unwrap();
1864
1865            // Send second message at new bandwidth
1866            let msg2 = Bytes::from(vec![2u8; 10_000]); // 10 KB
1867            let msg2_start = context.current();
1868            sender_tx
1869                .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
1870                .await
1871                .unwrap();
1872
1873            // Receive second message (should take ~5s at 2KB/s)
1874            let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
1875            let msg2_time = context.current().duration_since(msg2_start).unwrap();
1876            assert_eq!(received_msg2.len(), 10_000);
1877            assert!(
1878                msg2_time >= Duration::from_millis(4999)
1879                    && msg2_time <= Duration::from_millis(5010),
1880                "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
1881            );
1882        });
1883    }
1884
1885    #[test]
1886    fn test_zero_sender_bandwidth_returns_empty() {
1887        // This test verifies that when sender bandwidth is 0, the send returns empty list
1888        let executor = deterministic::Runner::default();
1889        executor.start(|context| async move {
1890            let (network, mut oracle) = Network::new(
1891                context.with_label("network"),
1892                Config {
1893                    max_size: 1024 * 1024,
1894                },
1895            );
1896            network.start();
1897
1898            let pk_sender = PrivateKey::from_seed(1).public_key();
1899            let pk_receiver = PrivateKey::from_seed(2).public_key();
1900
1901            // Register peers and establish link
1902            let (sender_tx, _) = oracle.register(pk_sender.clone(), 0).await.unwrap();
1903            let (_, mut receiver_rx) = oracle.register(pk_receiver.clone(), 0).await.unwrap();
1904            oracle
1905                .add_link(
1906                    pk_sender.clone(),
1907                    pk_receiver.clone(),
1908                    Link {
1909                        latency: Duration::from_millis(10),
1910                        jitter: Duration::ZERO,
1911                        success_rate: 1.0,
1912                    },
1913                )
1914                .await
1915                .unwrap();
1916
1917            // Set sender bandwidth to 0
1918            oracle
1919                .set_bandwidth(pk_sender.clone(), 0, usize::MAX)
1920                .await
1921                .unwrap();
1922            oracle
1923                .set_bandwidth(pk_receiver.clone(), usize::MAX, 10_000)
1924                .await
1925                .unwrap();
1926
1927            let msg = Bytes::from(vec![1u8; 1000]);
1928
1929            // Try to send, this should return empty list
1930            let mut sender = sender_tx.clone();
1931            let result = sender
1932                .send(Recipients::One(pk_receiver.clone()), msg.clone(), false)
1933                .await
1934                .unwrap();
1935
1936            // Should get empty list since sender has zero bandwidth
1937            assert!(result.is_empty());
1938
1939            // Try to receive with timeout, should timeout since nothing was sent
1940            select! {
1941              _ = receiver_rx.recv() => {
1942                panic!(
1943                  "Should not receive message when sender bandwidth is 0"
1944                );
1945              },
1946              _ = context.clone().sleep(Duration::from_secs(2)) => {},
1947            };
1948        });
1949    }
1950
1951    #[test]
1952    fn test_zero_receiver_bandwidth_uses_sender_bandwidth() {
1953        // When receiver bandwidth is 0, transfer uses only sender bandwidth
1954        let executor = deterministic::Runner::default();
1955        executor.start(|context| async move {
1956            let (network, mut oracle) = Network::new(
1957                context.with_label("network"),
1958                Config {
1959                    max_size: 1024 * 1024,
1960                },
1961            );
1962            network.start();
1963
1964            let pk_sender = PrivateKey::from_seed(1).public_key();
1965            let pk_receiver = PrivateKey::from_seed(2).public_key();
1966
1967            // Register peers and establish link
1968            let (mut sender_tx, _) = oracle.register(pk_sender.clone(), 0).await.unwrap();
1969            let (_, mut receiver_rx) = oracle.register(pk_receiver.clone(), 0).await.unwrap();
1970            oracle
1971                .add_link(
1972                    pk_sender.clone(),
1973                    pk_receiver.clone(),
1974                    Link {
1975                        latency: Duration::from_millis(10),
1976                        jitter: Duration::ZERO,
1977                        success_rate: 1.0,
1978                    },
1979                )
1980                .await
1981                .unwrap();
1982
1983            oracle
1984                .set_bandwidth(pk_sender.clone(), 10_000, usize::MAX)
1985                .await
1986                .unwrap();
1987            oracle
1988                .set_bandwidth(pk_receiver.clone(), usize::MAX, 0)
1989                .await
1990                .unwrap();
1991
1992            let msg2 = Bytes::from(vec![2u8; 10_000]); // 10KB
1993            let start2 = context.current();
1994            let sent = sender_tx
1995                .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
1996                .await
1997                .unwrap();
1998            let send_time = context.current().duration_since(start2).unwrap();
1999
2000            assert_eq!(sent.len(), 1);
2001            assert_eq!(sent[0], pk_receiver);
2002            // When receiver bandwidth is 0, should still use sender bandwidth (10KB/s)
2003            assert!(
2004                send_time >= Duration::from_millis(999) && send_time <= Duration::from_millis(1010),
2005                "With receiver bandwidth 0, should still use sender bandwidth (~1s), got {send_time:?}",
2006            );
2007
2008            // Message should never arrive
2009            // Try to receive with timeout, should timeout
2010            select! {
2011              _ = receiver_rx.recv() => {
2012                panic!(
2013                  "Should not receive message when receiver bandwidth is 0"
2014                );
2015              },
2016              _ = context.clone().sleep(Duration::from_secs(2)) => {},
2017            };
2018        });
2019    }
2020}