1mod bandwidth;
135mod ingress;
136mod metrics;
137mod network;
138
139use thiserror::Error;
140
141#[derive(Debug, Error)]
143pub enum Error {
144 #[error("message too large: {0}")]
145 MessageTooLarge(usize),
146 #[error("network closed")]
147 NetworkClosed,
148 #[error("not valid to link self")]
149 LinkingSelf,
150 #[error("link already exists")]
151 LinkExists,
152 #[error("link missing")]
153 LinkMissing,
154 #[error("invalid success rate (must be in [0, 1]): {0}")]
155 InvalidSuccessRate(f64),
156 #[error("channel already registered: {0}")]
157 ChannelAlreadyRegistered(u32),
158 #[error("send_frame failed")]
159 SendFrameFailed,
160 #[error("recv_frame failed")]
161 RecvFrameFailed,
162 #[error("bind failed")]
163 BindFailed,
164 #[error("accept failed")]
165 AcceptFailed,
166 #[error("dial failed")]
167 DialFailed,
168 #[error("peer missing")]
169 PeerMissing,
170}
171
172pub use ingress::{Link, Oracle};
173pub use network::{Config, Network, Receiver, Sender};
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use crate::{Receiver, Recipients, Sender};
179 use bytes::Bytes;
180 use commonware_cryptography::{
181 ed25519::{self, PrivateKey, PublicKey},
182 PrivateKeyExt as _, Signer as _,
183 };
184 use commonware_macros::select;
185 use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
186 use futures::{channel::mpsc, future::join_all, SinkExt, StreamExt};
187 use rand::Rng;
188 use std::{
189 collections::{BTreeMap, HashMap, HashSet},
190 time::Duration,
191 };
192
193 fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
194 let executor = deterministic::Runner::seeded(seed);
195 executor.start(|context| async move {
196 let (network, mut oracle) = Network::new(
198 context.with_label("network"),
199 Config {
200 max_size: 1024 * 1024,
201 },
202 );
203
204 network.start();
206
207 let mut agents = BTreeMap::new();
209 let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
210 for i in 0..size {
211 let pk = PrivateKey::from_seed(i as u64).public_key();
212 let (sender, mut receiver) = oracle.register(pk.clone(), 0).await.unwrap();
213 agents.insert(pk, sender);
214 let mut agent_sender = seen_sender.clone();
215 context
216 .with_label("agent_receiver")
217 .spawn(move |_| async move {
218 for _ in 0..size {
219 receiver.recv().await.unwrap();
220 }
221 agent_sender.send(i).await.unwrap();
222
223 });
225 }
226
227 let only_inbound = PrivateKey::from_seed(0).public_key();
229 for agent in agents.keys() {
230 if agent == &only_inbound {
231 continue;
233 }
234 for other in agents.keys() {
235 let result = oracle
236 .add_link(
237 agent.clone(),
238 other.clone(),
239 Link {
240 latency: Duration::from_millis(5),
241 jitter: Duration::from_millis(2),
242 success_rate: 0.75,
243 },
244 )
245 .await;
246 if agent == other {
247 assert!(matches!(result, Err(Error::LinkingSelf)));
248 } else {
249 assert!(result.is_ok());
250 }
251 }
252 }
253
254 context
256 .with_label("agent_sender")
257 .spawn(|mut context| async move {
258 let keys = agents.keys().collect::<Vec<_>>();
260
261 loop {
263 let index = context.gen_range(0..keys.len());
264 let sender = keys[index];
265 let msg = format!("hello from {sender:?}");
266 let msg = Bytes::from(msg);
267 let mut message_sender = agents.get(sender).unwrap().clone();
268 let sent = message_sender
269 .send(Recipients::All, msg.clone(), false)
270 .await
271 .unwrap();
272 if sender == &only_inbound {
273 assert_eq!(sent.len(), 0);
274 } else {
275 assert_eq!(sent.len(), keys.len() - 1);
276 }
277 }
278 });
279
280 let mut results = Vec::new();
282 for _ in 0..size {
283 results.push(seen_receiver.next().await.unwrap());
284 }
285 (context.auditor().state(), results)
286 })
287 }
288
289 fn compare_outputs(seeds: u64, size: usize) {
290 let mut outputs = Vec::new();
292 for seed in 0..seeds {
293 outputs.push(simulate_messages(seed, size));
294 }
295
296 for seed in 0..seeds {
298 let output = simulate_messages(seed, size);
299 assert_eq!(output, outputs[seed as usize]);
300 }
301 }
302
303 #[test]
304 fn test_determinism() {
305 compare_outputs(25, 25);
306 }
307
308 #[test]
309 fn test_message_too_big() {
310 let executor = deterministic::Runner::default();
311 executor.start(|mut context| async move {
312 let (network, mut oracle) = Network::new(
314 context.with_label("network"),
315 Config {
316 max_size: 1024 * 1024,
317 },
318 );
319
320 network.start();
322
323 let mut agents = HashMap::new();
325 for i in 0..10 {
326 let pk = PrivateKey::from_seed(i as u64).public_key();
327 let (sender, _) = oracle.register(pk.clone(), 0).await.unwrap();
328 agents.insert(pk, sender);
329 }
330
331 let keys = agents.keys().collect::<Vec<_>>();
333 let index = context.gen_range(0..keys.len());
334 let sender = keys[index];
335 let mut message_sender = agents.get(sender).unwrap().clone();
336 let mut msg = vec![0u8; 1024 * 1024 + 1];
337 context.fill(&mut msg[..]);
338 let result = message_sender
339 .send(Recipients::All, msg.into(), false)
340 .await
341 .unwrap_err();
342
343 assert!(matches!(result, Error::MessageTooLarge(_)));
345 });
346 }
347
348 #[test]
349 fn test_linking_self() {
350 let executor = deterministic::Runner::default();
351 executor.start(|context| async move {
352 let (network, mut oracle) = Network::new(
354 context.with_label("network"),
355 Config {
356 max_size: 1024 * 1024,
357 },
358 );
359
360 network.start();
362
363 let pk = PrivateKey::from_seed(0).public_key();
365 oracle.register(pk.clone(), 0).await.unwrap();
366
367 let result = oracle
369 .add_link(
370 pk.clone(),
371 pk,
372 Link {
373 latency: Duration::from_millis(5),
374 jitter: Duration::from_millis(2),
375 success_rate: 0.75,
376 },
377 )
378 .await;
379
380 assert!(matches!(result, Err(Error::LinkingSelf)));
382 });
383 }
384
385 #[test]
386 fn test_duplicate_channel() {
387 let executor = deterministic::Runner::default();
388 executor.start(|context| async move {
389 let (network, mut oracle) = Network::new(
391 context.with_label("network"),
392 Config {
393 max_size: 1024 * 1024,
394 },
395 );
396
397 network.start();
399
400 let pk = PrivateKey::from_seed(0).public_key();
402 oracle.register(pk.clone(), 0).await.unwrap();
403 let result = oracle.register(pk, 0).await;
404
405 assert!(matches!(result, Err(Error::ChannelAlreadyRegistered(0))));
407 });
408 }
409
410 #[test]
411 fn test_invalid_success_rate() {
412 let executor = deterministic::Runner::default();
413 executor.start(|context| async move {
414 let (network, mut oracle) = Network::new(
416 context.with_label("network"),
417 Config {
418 max_size: 1024 * 1024,
419 },
420 );
421
422 network.start();
424
425 let pk1 = PrivateKey::from_seed(0).public_key();
427 let pk2 = PrivateKey::from_seed(1).public_key();
428 oracle.register(pk1.clone(), 0).await.unwrap();
429 oracle.register(pk2.clone(), 0).await.unwrap();
430
431 let result = oracle
433 .add_link(
434 pk1,
435 pk2,
436 Link {
437 latency: Duration::from_millis(5),
438 jitter: Duration::from_millis(2),
439 success_rate: 1.5,
440 },
441 )
442 .await;
443
444 assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
446 });
447 }
448
449 #[test]
450 fn test_simple_message_delivery() {
451 let executor = deterministic::Runner::default();
452 executor.start(|context| async move {
453 let (network, mut oracle) = Network::new(
455 context.with_label("network"),
456 Config {
457 max_size: 1024 * 1024,
458 },
459 );
460
461 network.start();
463
464 let pk1 = PrivateKey::from_seed(0).public_key();
466 let pk2 = PrivateKey::from_seed(1).public_key();
467 let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
468 let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
469
470 let _ = oracle.register(pk1.clone(), 1).await.unwrap();
472 let _ = oracle.register(pk2.clone(), 2).await.unwrap();
473
474 oracle
476 .add_link(
477 pk1.clone(),
478 pk2.clone(),
479 Link {
480 latency: Duration::from_millis(5),
481 jitter: Duration::from_millis(2),
482 success_rate: 1.0,
483 },
484 )
485 .await
486 .unwrap();
487 oracle
488 .add_link(
489 pk2.clone(),
490 pk1.clone(),
491 Link {
492 latency: Duration::from_millis(5),
493 jitter: Duration::from_millis(2),
494 success_rate: 1.0,
495 },
496 )
497 .await
498 .unwrap();
499
500 let msg1 = Bytes::from("hello from pk1");
502 let msg2 = Bytes::from("hello from pk2");
503 sender1
504 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
505 .await
506 .unwrap();
507 sender2
508 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
509 .await
510 .unwrap();
511
512 let (sender, message) = receiver1.recv().await.unwrap();
514 assert_eq!(sender, pk2);
515 assert_eq!(message, msg2);
516 let (sender, message) = receiver2.recv().await.unwrap();
517 assert_eq!(sender, pk1);
518 assert_eq!(message, msg1);
519 });
520 }
521
522 #[test]
523 fn test_send_wrong_channel() {
524 let executor = deterministic::Runner::default();
525 executor.start(|context| async move {
526 let (network, mut oracle) = Network::new(
528 context.with_label("network"),
529 Config {
530 max_size: 1024 * 1024,
531 },
532 );
533
534 network.start();
536
537 let pk1 = PrivateKey::from_seed(0).public_key();
539 let pk2 = PrivateKey::from_seed(1).public_key();
540 let (mut sender1, _) = oracle.register(pk1.clone(), 0).await.unwrap();
541 let (_, mut receiver2) = oracle.register(pk2.clone(), 1).await.unwrap();
542
543 oracle
545 .add_link(
546 pk1,
547 pk2.clone(),
548 Link {
549 latency: Duration::from_millis(5),
550 jitter: Duration::ZERO,
551 success_rate: 1.0,
552 },
553 )
554 .await
555 .unwrap();
556
557 let msg = Bytes::from("hello from pk1");
559 sender1
560 .send(Recipients::One(pk2), msg, false)
561 .await
562 .unwrap();
563
564 select! {
566 _ = receiver2.recv() => {
567 panic!("unexpected message");
568 },
569 _ = context.sleep(Duration::from_secs(1)) => {},
570 }
571 });
572 }
573
574 #[test]
575 fn test_dynamic_peers() {
576 let executor = deterministic::Runner::default();
577 executor.start(|context| async move {
578 let (network, mut oracle) = Network::new(
580 context.with_label("network"),
581 Config {
582 max_size: 1024 * 1024,
583 },
584 );
585
586 network.start();
588
589 let pk1 = PrivateKey::from_seed(0).public_key();
591 let pk2 = PrivateKey::from_seed(1).public_key();
592 let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
593 let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
594
595 oracle
597 .add_link(
598 pk1.clone(),
599 pk2.clone(),
600 Link {
601 latency: Duration::from_millis(5),
602 jitter: Duration::from_millis(2),
603 success_rate: 1.0,
604 },
605 )
606 .await
607 .unwrap();
608 oracle
609 .add_link(
610 pk2.clone(),
611 pk1.clone(),
612 Link {
613 latency: Duration::from_millis(5),
614 jitter: Duration::from_millis(2),
615 success_rate: 1.0,
616 },
617 )
618 .await
619 .unwrap();
620
621 let msg1 = Bytes::from("attempt 1: hello from pk1");
623 let msg2 = Bytes::from("attempt 1: hello from pk2");
624 sender1
625 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
626 .await
627 .unwrap();
628 sender2
629 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
630 .await
631 .unwrap();
632
633 let (sender, message) = receiver1.recv().await.unwrap();
635 assert_eq!(sender, pk2);
636 assert_eq!(message, msg2);
637 let (sender, message) = receiver2.recv().await.unwrap();
638 assert_eq!(sender, pk1);
639 assert_eq!(message, msg1);
640 });
641 }
642
643 #[test]
644 fn test_dynamic_links() {
645 let executor = deterministic::Runner::default();
646 executor.start(|context| async move {
647 let (network, mut oracle) = Network::new(
649 context.with_label("network"),
650 Config {
651 max_size: 1024 * 1024,
652 },
653 );
654
655 network.start();
657
658 let pk1 = PrivateKey::from_seed(0).public_key();
660 let pk2 = PrivateKey::from_seed(1).public_key();
661 let (mut sender1, mut receiver1) = oracle.register(pk1.clone(), 0).await.unwrap();
662 let (mut sender2, mut receiver2) = oracle.register(pk2.clone(), 0).await.unwrap();
663
664 let msg1 = Bytes::from("attempt 1: hello from pk1");
666 let msg2 = Bytes::from("attempt 1: hello from pk2");
667 sender1
668 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
669 .await
670 .unwrap();
671 sender2
672 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
673 .await
674 .unwrap();
675
676 select! {
678 _ = receiver1.recv() => {
679 panic!("unexpected message");
680 },
681 _ = receiver2.recv() => {
682 panic!("unexpected message");
683 },
684 _ = context.sleep(Duration::from_secs(1)) => {},
685 }
686
687 oracle
689 .add_link(
690 pk1.clone(),
691 pk2.clone(),
692 Link {
693 latency: Duration::from_millis(5),
694 jitter: Duration::from_millis(2),
695 success_rate: 1.0,
696 },
697 )
698 .await
699 .unwrap();
700 oracle
701 .add_link(
702 pk2.clone(),
703 pk1.clone(),
704 Link {
705 latency: Duration::from_millis(5),
706 jitter: Duration::from_millis(2),
707 success_rate: 1.0,
708 },
709 )
710 .await
711 .unwrap();
712
713 let msg1 = Bytes::from("attempt 2: hello from pk1");
715 let msg2 = Bytes::from("attempt 2: hello from pk2");
716 sender1
717 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
718 .await
719 .unwrap();
720 sender2
721 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
722 .await
723 .unwrap();
724
725 let (sender, message) = receiver1.recv().await.unwrap();
727 assert_eq!(sender, pk2);
728 assert_eq!(message, msg2);
729 let (sender, message) = receiver2.recv().await.unwrap();
730 assert_eq!(sender, pk1);
731 assert_eq!(message, msg1);
732
733 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
735 oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
736
737 let msg1 = Bytes::from("attempt 3: hello from pk1");
739 let msg2 = Bytes::from("attempt 3: hello from pk2");
740 sender1
741 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
742 .await
743 .unwrap();
744 sender2
745 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
746 .await
747 .unwrap();
748
749 select! {
751 _ = receiver1.recv() => {
752 panic!("unexpected message");
753 },
754 _ = receiver2.recv() => {
755 panic!("unexpected message");
756 },
757 _ = context.sleep(Duration::from_secs(1)) => {},
758 }
759
760 let result = oracle.remove_link(pk1, pk2).await;
762 assert!(matches!(result, Err(Error::LinkMissing)));
763 });
764 }
765
766 async fn test_bandwidth_between_peers(
767 context: &mut deterministic::Context,
768 oracle: &mut Oracle<PublicKey>,
769 sender_bps: Option<usize>,
770 receiver_bps: Option<usize>,
771 message_size: usize,
772 expected_duration_ms: u64,
773 ) {
774 let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
776 let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
777 let (mut sender, _) = oracle.register(pk1.clone(), 0).await.unwrap();
778 let (_, mut receiver) = oracle.register(pk2.clone(), 0).await.unwrap();
779
780 oracle
782 .set_bandwidth(pk1.clone(), sender_bps.unwrap_or(usize::MAX), usize::MAX)
783 .await
784 .unwrap();
785 oracle
786 .set_bandwidth(pk2.clone(), usize::MAX, receiver_bps.unwrap_or(usize::MAX))
787 .await
788 .unwrap();
789
790 oracle
792 .add_link(
793 pk1.clone(),
794 pk2.clone(),
795 Link {
796 latency: Duration::ZERO,
798 jitter: Duration::ZERO,
799 success_rate: 1.0,
800 },
801 )
802 .await
803 .unwrap();
804
805 let msg = Bytes::from(vec![42u8; message_size]);
807 let start = context.current();
808 sender
809 .send(Recipients::One(pk2.clone()), msg.clone(), true)
810 .await
811 .unwrap();
812
813 let (origin, received) = receiver.recv().await.unwrap();
815 let elapsed = context.current().duration_since(start).unwrap();
816
817 assert_eq!(origin, pk1);
818 assert_eq!(received, msg);
819 assert!(
820 elapsed >= Duration::from_millis(expected_duration_ms),
821 "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
822 );
823 assert!(
824 elapsed < Duration::from_millis(expected_duration_ms + 100),
825 "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
826 );
827 }
828
829 #[test]
830 fn test_bandwidth() {
831 let executor = deterministic::Runner::default();
832 executor.start(|mut context| async move {
833 let (network, mut oracle) = Network::new(
834 context.with_label("network"),
835 Config {
836 max_size: 1024 * 1024,
837 },
838 );
839 network.start();
840
841 test_bandwidth_between_peers(
844 &mut context,
845 &mut oracle,
846 Some(1000), Some(1000), 500, 500, )
851 .await;
852
853 test_bandwidth_between_peers(
857 &mut context,
858 &mut oracle,
859 Some(500), Some(2000), 250, 500, )
864 .await;
865
866 test_bandwidth_between_peers(
870 &mut context,
871 &mut oracle,
872 Some(2000), Some(500), 250, 500, )
877 .await;
878
879 test_bandwidth_between_peers(
883 &mut context,
884 &mut oracle,
885 None, Some(1000), 500, 500, )
890 .await;
891
892 test_bandwidth_between_peers(
896 &mut context,
897 &mut oracle,
898 Some(1000), None, 500, 500, )
903 .await;
904
905 test_bandwidth_between_peers(
908 &mut context,
909 &mut oracle,
910 None, None, 500, 0, )
915 .await;
916 });
917 }
918
919 #[test]
920 fn test_bandwidth_contention() {
921 let executor = deterministic::Runner::default();
923 executor.start(|context| async move {
924 let (network, mut oracle) = Network::new(
925 context.with_label("network"),
926 Config {
927 max_size: 1024 * 1024,
928 },
929 );
930 network.start();
931
932 const NUM_PEERS: usize = 100;
934 const MESSAGE_SIZE: usize = 1000; const EFFECTIVE_BPS: usize = 10_000; let mut peers = Vec::with_capacity(NUM_PEERS + 1);
939 let mut senders = Vec::with_capacity(NUM_PEERS + 1);
940 let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
941
942 for i in 0..=NUM_PEERS {
944 let pk = PrivateKey::from_seed(i as u64).public_key();
945 let (sender, receiver) = oracle.register(pk.clone(), 0).await.unwrap();
946 peers.push(pk);
947 senders.push(sender);
948 receivers.push(receiver);
949 }
950
951 for pk in &peers {
953 oracle
954 .set_bandwidth(pk.clone(), EFFECTIVE_BPS, EFFECTIVE_BPS)
955 .await
956 .unwrap();
957 }
958
959 for peer in peers.iter().skip(1) {
961 oracle
962 .add_link(
963 peer.clone(),
964 peers[0].clone(),
965 Link {
966 latency: Duration::ZERO,
967 jitter: Duration::ZERO,
968 success_rate: 1.0,
969 },
970 )
971 .await
972 .unwrap();
973 oracle
974 .add_link(
975 peers[0].clone(),
976 peer.clone(),
977 Link {
978 latency: Duration::ZERO,
979 jitter: Duration::ZERO,
980 success_rate: 1.0,
981 },
982 )
983 .await
984 .unwrap();
985 }
986
987 let start = context.current();
990
991 join_all(peers.iter().skip(1).map(|peer| {
994 let mut sender = senders[0].clone();
995 let recipient = peer.clone();
996 let msg = Bytes::from(vec![0u8; MESSAGE_SIZE]);
997 context.clone().spawn(|_| async move {
998 sender
999 .send(Recipients::One(recipient), msg, true)
1000 .await
1001 .unwrap()
1002 })
1003 }))
1004 .await;
1005
1006 let elapsed = context.current().duration_since(start).unwrap();
1007
1008 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1010
1011 assert!(
1012 elapsed >= Duration::from_millis(expected_ms as u64),
1013 "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1014 );
1015 assert!(
1016 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1017 "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1018 );
1019
1020 for receiver in receivers.iter_mut().skip(1) {
1022 let (origin, received) = receiver.recv().await.unwrap();
1023 assert_eq!(origin, peers[0]);
1024 assert_eq!(received.len(), MESSAGE_SIZE);
1025 }
1026
1027 let start = context.current();
1029
1030 join_all(senders.iter().skip(1).map(|sender| {
1033 let mut sender = sender.clone();
1034 let recipient = peers[0].clone();
1035 let msg = Bytes::from(vec![0; MESSAGE_SIZE]);
1036 context.clone().spawn(|_| async move {
1037 sender
1038 .send(Recipients::One(recipient), msg, true)
1039 .await
1040 .unwrap()
1041 })
1042 }))
1043 .await;
1044
1045 let mut received_from = HashSet::new();
1047 for _ in 1..=NUM_PEERS {
1048 let (origin, received) = receivers[0].recv().await.unwrap();
1049 assert_eq!(received.len(), MESSAGE_SIZE);
1050 assert!(
1051 received_from.insert(origin.clone()),
1052 "Received duplicate from {origin:?}"
1053 );
1054 }
1055
1056 let elapsed = context.current().duration_since(start).unwrap();
1057
1058 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1060
1061 assert!(
1062 elapsed >= Duration::from_millis(expected_ms as u64),
1063 "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1064 );
1065 assert!(
1066 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1067 "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1068 );
1069
1070 assert_eq!(received_from.len(), NUM_PEERS);
1072 for peer in peers.iter().skip(1) {
1073 assert!(received_from.contains(peer));
1074 }
1075 });
1076 }
1077
1078 #[test]
1079 fn test_message_ordering() {
1080 let executor = deterministic::Runner::default();
1082 executor.start(|context| async move {
1083 let (network, mut oracle) = Network::new(
1084 context.with_label("network"),
1085 Config {
1086 max_size: 1024 * 1024,
1087 },
1088 );
1089 network.start();
1090
1091 let pk1 = PrivateKey::from_seed(1).public_key();
1093 let pk2 = PrivateKey::from_seed(2).public_key();
1094 let (mut sender, _) = oracle.register(pk1.clone(), 0).await.unwrap();
1095 let (_, mut receiver) = oracle.register(pk2.clone(), 0).await.unwrap();
1096
1097 oracle
1099 .add_link(
1100 pk1.clone(),
1101 pk2.clone(),
1102 Link {
1103 latency: Duration::from_millis(50),
1104 jitter: Duration::from_millis(40),
1105 success_rate: 1.0,
1106 },
1107 )
1108 .await
1109 .unwrap();
1110
1111 let messages = vec![
1113 Bytes::from("message 1"),
1114 Bytes::from("message 2"),
1115 Bytes::from("message 3"),
1116 Bytes::from("message 4"),
1117 Bytes::from("message 5"),
1118 ];
1119
1120 for msg in messages.clone() {
1121 sender
1122 .send(Recipients::One(pk2.clone()), msg, true)
1123 .await
1124 .unwrap();
1125 }
1126
1127 for expected_msg in messages {
1129 let (origin, received_msg) = receiver.recv().await.unwrap();
1130 assert_eq!(origin, pk1);
1131 assert_eq!(received_msg, expected_msg);
1132 }
1133 })
1134 }
1135
1136 #[test]
1137 fn test_many_to_one_bandwidth_sharing() {
1138 let executor = deterministic::Runner::default();
1139 executor.start(|context| async move {
1140 let (network, mut oracle) = Network::new(
1141 context.with_label("network"),
1142 Config {
1143 max_size: 1024 * 1024,
1144 },
1145 );
1146 network.start();
1147
1148 let mut senders = Vec::new();
1150 let mut sender_txs = Vec::new();
1151 for i in 0..10 {
1152 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1153 senders.push(sender.clone());
1154 let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1155 sender_txs.push(tx);
1156
1157 oracle
1159 .set_bandwidth(sender.clone(), 10_000, usize::MAX)
1160 .await
1161 .unwrap();
1162 }
1163
1164 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1165 let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1166
1167 oracle
1169 .set_bandwidth(receiver.clone(), usize::MAX, 100_000)
1170 .await
1171 .unwrap();
1172
1173 for sender in &senders {
1175 oracle
1176 .add_link(
1177 sender.clone(),
1178 receiver.clone(),
1179 Link {
1180 latency: Duration::ZERO,
1181 jitter: Duration::ZERO,
1182 success_rate: 1.0,
1183 },
1184 )
1185 .await
1186 .unwrap();
1187 }
1188
1189 let start = context.current();
1190
1191 let mut handles = Vec::new();
1193 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1194 let receiver_clone = receiver.clone();
1195 let handle = context.clone().spawn(move |_| async move {
1196 let msg = Bytes::from(vec![i as u8; 10_000]);
1197 tx.send(Recipients::One(receiver_clone), msg, true)
1198 .await
1199 .unwrap();
1200 });
1201 handles.push(handle);
1202 }
1203
1204 for handle in handles {
1206 handle.await.unwrap();
1207 }
1208
1209 for i in 0..10 {
1212 let (_, _msg) = receiver_rx.recv().await.unwrap();
1213 let recv_time = context.current().duration_since(start).unwrap();
1214
1215 assert!(
1217 recv_time >= Duration::from_millis(950)
1218 && recv_time <= Duration::from_millis(1100),
1219 "Message {i} received at {recv_time:?}, expected ~1s",
1220 );
1221 }
1222 });
1223 }
1224
1225 #[test]
1226 fn test_one_to_many_fast_sender() {
1227 let executor = deterministic::Runner::default();
1230 executor.start(|context| async move {
1231 let (network, mut oracle) = Network::new(
1232 context.with_label("network"),
1233 Config {
1234 max_size: 1024 * 1024,
1235 },
1236 );
1237 network.start();
1238
1239 let sender = ed25519::PrivateKey::from_seed(0).public_key();
1241 let (sender_tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1242
1243 oracle
1245 .set_bandwidth(sender.clone(), 100_000, usize::MAX)
1246 .await
1247 .unwrap();
1248
1249 let mut receivers = Vec::new();
1251 let mut receiver_rxs = Vec::new();
1252 for i in 0..10 {
1253 let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1254 receivers.push(receiver.clone());
1255 let (_, rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1256 receiver_rxs.push(rx);
1257
1258 oracle
1260 .set_bandwidth(receiver.clone(), usize::MAX, 10_000)
1261 .await
1262 .unwrap();
1263
1264 oracle
1266 .add_link(
1267 sender.clone(),
1268 receiver.clone(),
1269 Link {
1270 latency: Duration::ZERO,
1271 jitter: Duration::ZERO,
1272 success_rate: 1.0,
1273 },
1274 )
1275 .await
1276 .unwrap();
1277 }
1278
1279 let start = context.current();
1280
1281 let mut handles = Vec::new();
1283 for (i, receiver) in receivers.iter().enumerate() {
1284 let mut sender_tx = sender_tx.clone();
1285 let receiver_clone = receiver.clone();
1286 let handle = context.clone().spawn(move |_| async move {
1287 let msg = Bytes::from(vec![i as u8; 10_000]);
1288 sender_tx
1289 .send(Recipients::One(receiver_clone), msg, true)
1290 .await
1291 .unwrap();
1292 });
1293 handles.push(handle);
1294 }
1295
1296 for handle in handles {
1298 handle.await.unwrap();
1299 }
1300
1301 let send_time = context.current().duration_since(start).unwrap();
1302
1303 assert!(
1305 send_time >= Duration::from_millis(950) && send_time <= Duration::from_millis(1100),
1306 "Sender took {send_time:?} to send 100KB, expected ~1s",
1307 );
1308
1309 for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1311 let (_, msg) = rx.recv().await.unwrap();
1312 assert_eq!(msg[0], i as u8);
1313 let recv_time = context.current().duration_since(start).unwrap();
1314
1315 assert!(
1317 recv_time >= Duration::from_millis(950)
1318 && recv_time <= Duration::from_millis(1100),
1319 "Receiver {i} received at {recv_time:?}, expected ~1s",
1320 );
1321 }
1322 });
1323 }
1324
1325 #[test]
1326 fn test_many_slow_senders_to_fast_receiver() {
1327 let executor = deterministic::Runner::default();
1330 executor.start(|context| async move {
1331 let (network, mut oracle) = Network::new(
1332 context.with_label("network"),
1333 Config {
1334 max_size: 1024 * 1024,
1335 },
1336 );
1337 network.start();
1338
1339 let mut senders = Vec::new();
1341 let mut sender_txs = Vec::new();
1342 for i in 0..10 {
1343 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1344 senders.push(sender.clone());
1345 let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1346 sender_txs.push(tx);
1347
1348 oracle
1350 .set_bandwidth(sender.clone(), 1_000, usize::MAX)
1351 .await
1352 .unwrap();
1353 }
1354
1355 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1357 let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1358
1359 oracle
1361 .set_bandwidth(receiver.clone(), usize::MAX, 10_000)
1362 .await
1363 .unwrap();
1364
1365 for sender in &senders {
1367 oracle
1368 .add_link(
1369 sender.clone(),
1370 receiver.clone(),
1371 Link {
1372 latency: Duration::ZERO,
1373 jitter: Duration::ZERO,
1374 success_rate: 1.0,
1375 },
1376 )
1377 .await
1378 .unwrap();
1379 }
1380
1381 let start = context.current();
1382
1383 let mut handles = Vec::new();
1385 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1386 let receiver_clone = receiver.clone();
1387 let handle = context.clone().spawn(move |_| async move {
1388 let msg = Bytes::from(vec![i as u8; 1_000]);
1389 tx.send(Recipients::One(receiver_clone), msg, true)
1390 .await
1391 .unwrap();
1392 });
1393 handles.push(handle);
1394 }
1395
1396 for handle in handles {
1398 handle.await.unwrap();
1399 }
1400
1401 for i in 0..10 {
1407 let (_, _msg) = receiver_rx.recv().await.unwrap();
1408 let recv_time = context.current().duration_since(start).unwrap();
1409
1410 assert!(
1412 recv_time >= Duration::from_millis(950)
1413 && recv_time <= Duration::from_millis(1100),
1414 "Message {i} received at {recv_time:?}, expected ~1s",
1415 );
1416 }
1417 });
1418 }
1419
1420 #[test]
1421 fn test_dynamic_bandwidth_allocation_staggered() {
1422 let executor = deterministic::Runner::default();
1428 executor.start(|context| async move {
1429 let (network, mut oracle) = Network::new(
1430 context.with_label("network"),
1431 Config {
1432 max_size: 1024 * 1024,
1433 },
1434 );
1435 network.start();
1436
1437 let mut senders = Vec::new();
1439 let mut sender_txs = Vec::new();
1440 for i in 0..3 {
1441 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1442 senders.push(sender.clone());
1443 let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1444 sender_txs.push(tx);
1445
1446 oracle
1448 .set_bandwidth(sender.clone(), 30_000, usize::MAX)
1449 .await
1450 .unwrap();
1451 }
1452
1453 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1455 let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1456 oracle
1457 .set_bandwidth(receiver.clone(), usize::MAX, 30_000)
1458 .await
1459 .unwrap();
1460
1461 for sender in &senders {
1463 oracle
1464 .add_link(
1465 sender.clone(),
1466 receiver.clone(),
1467 Link {
1468 latency: Duration::from_millis(1),
1469 jitter: Duration::ZERO,
1470 success_rate: 1.0,
1471 },
1472 )
1473 .await
1474 .unwrap();
1475 }
1476
1477 let start = context.current();
1478
1479 let mut tx0 = sender_txs[0].clone();
1482 let rx_clone = receiver.clone();
1483 context.clone().spawn(move |_| async move {
1484 let msg = Bytes::from(vec![0u8; 30_000]);
1485 tx0.send(Recipients::One(rx_clone), msg, true)
1486 .await
1487 .unwrap();
1488 });
1489
1490 let mut tx1 = sender_txs[1].clone();
1494 let rx_clone = receiver.clone();
1495 context.clone().spawn(move |context| async move {
1496 context.sleep(Duration::from_millis(500)).await;
1497 let msg = Bytes::from(vec![1u8; 30_000]);
1498 tx1.send(Recipients::One(rx_clone), msg, true)
1499 .await
1500 .unwrap();
1501 });
1502
1503 let mut tx2 = sender_txs[2].clone();
1507 let rx_clone = receiver.clone();
1508 context.clone().spawn(move |context| async move {
1509 context.sleep(Duration::from_millis(1500)).await;
1510 let msg = Bytes::from(vec![2u8; 15_000]);
1511 tx2.send(Recipients::One(rx_clone), msg, true)
1512 .await
1513 .unwrap();
1514 });
1515
1516 let (_, msg0) = receiver_rx.recv().await.unwrap();
1519 assert_eq!(msg0[0], 0);
1520 let t0 = context.current().duration_since(start).unwrap();
1521 assert!(
1522 t0 >= Duration::from_millis(1000) && t0 <= Duration::from_millis(1100),
1523 "Message 0 received at {t0:?}, expected ~1s",
1524 );
1525
1526 let (_, msg_a) = receiver_rx.recv().await.unwrap();
1530 let t_a = context.current().duration_since(start).unwrap();
1531
1532 let (_, msg_b) = receiver_rx.recv().await.unwrap();
1533 let t_b = context.current().duration_since(start).unwrap();
1534
1535 let (msg1, t1, msg2, t2) = if msg_a[0] == 1 {
1537 (msg_a, t_a, msg_b, t_b)
1538 } else {
1539 (msg_b, t_b, msg_a, t_a)
1540 };
1541
1542 assert_eq!(msg1[0], 1);
1543 assert_eq!(msg2[0], 2);
1544
1545 assert!(
1550 t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1551 "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1552 );
1553
1554 assert!(
1555 t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1556 "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1557 );
1558 });
1559 }
1560
1561 #[test]
1562 fn test_dynamic_bandwidth_varied_sizes() {
1563 let executor = deterministic::Runner::default();
1566 executor.start(|context| async move {
1567 let (network, mut oracle) = Network::new(
1568 context.with_label("network"),
1569 Config {
1570 max_size: 1024 * 1024,
1571 },
1572 );
1573 network.start();
1574
1575 let mut senders = Vec::new();
1577 let mut sender_txs = Vec::new();
1578 for i in 0..3 {
1579 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1580 senders.push(sender.clone());
1581 let (tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1582 sender_txs.push(tx);
1583
1584 oracle
1586 .set_bandwidth(sender.clone(), usize::MAX, usize::MAX)
1587 .await
1588 .unwrap();
1589 }
1590
1591 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1593 let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1594 oracle
1595 .set_bandwidth(receiver.clone(), usize::MAX, 30_000)
1596 .await
1597 .unwrap();
1598
1599 for sender in &senders {
1601 oracle
1602 .add_link(
1603 sender.clone(),
1604 receiver.clone(),
1605 Link {
1606 latency: Duration::from_millis(1),
1607 jitter: Duration::ZERO,
1608 success_rate: 1.0,
1609 },
1610 )
1611 .await
1612 .unwrap();
1613 }
1614
1615 let start = context.current();
1616
1617 let sizes = [10_000, 20_000, 30_000];
1624 let mut handles = Vec::new();
1625
1626 for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
1627 let rx_clone = receiver.clone();
1628 let msg_size = *size;
1629 let handle = context.clone().spawn(move |_| async move {
1630 let msg = Bytes::from(vec![i as u8; msg_size]);
1631 tx.send(Recipients::One(rx_clone), msg, true).await.unwrap();
1632 });
1633 handles.push(handle);
1634 }
1635
1636 for handle in handles {
1638 handle.await.unwrap();
1639 }
1640
1641 let mut messages = Vec::new();
1645 for _ in 0..3 {
1646 let (_, msg) = receiver_rx.recv().await.unwrap();
1647 let t = context.current().duration_since(start).unwrap();
1648 messages.push((msg[0] as usize, msg.len(), t));
1649 }
1650
1651 assert_eq!(messages.len(), 3);
1656
1657 let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
1659 assert!(
1660 max_time >= Duration::from_millis(2000),
1661 "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
1662 );
1663 });
1664 }
1665
1666 #[test]
1667 fn test_bandwidth_pipe_reservation_duration() {
1668 let executor = deterministic::Runner::default();
1671 executor.start(|context| async move {
1672 let (network, mut oracle) = Network::new(
1673 context.with_label("network"),
1674 Config {
1675 max_size: 1024 * 1024,
1676 },
1677 );
1678 network.start();
1679
1680 let sender = PrivateKey::from_seed(1).public_key();
1682 let receiver = PrivateKey::from_seed(2).public_key();
1683
1684 let (sender_tx, _) = oracle.register(sender.clone(), 0).await.unwrap();
1685 let (_, mut receiver_rx) = oracle.register(receiver.clone(), 0).await.unwrap();
1686
1687 oracle
1689 .set_bandwidth(sender.clone(), 1000, usize::MAX)
1690 .await
1691 .unwrap();
1692 oracle
1693 .set_bandwidth(receiver.clone(), usize::MAX, 1000)
1694 .await
1695 .unwrap();
1696
1697 oracle
1699 .add_link(
1700 sender.clone(),
1701 receiver.clone(),
1702 Link {
1703 latency: Duration::from_secs(1), jitter: Duration::ZERO,
1705 success_rate: 1.0,
1706 },
1707 )
1708 .await
1709 .unwrap();
1710
1711 let start = context.current();
1723
1724 let mut handles = Vec::new();
1726 for i in 0..3 {
1727 let mut sender_tx = sender_tx.clone();
1728 let receiver = receiver.clone();
1729 let msg = Bytes::from(vec![i; 500]);
1730 let handle = context.clone().spawn(move |context| async move {
1731 sender_tx
1732 .send(Recipients::One(receiver), msg, false)
1733 .await
1734 .unwrap();
1735
1736 context.current().duration_since(start).unwrap()
1738 });
1739 handles.push(handle);
1740
1741 context.sleep(Duration::from_millis(1)).await;
1743 }
1744
1745 let mut send_times = Vec::new();
1747 for handle in handles {
1748 let time = handle.await.unwrap();
1749 send_times.push(time);
1750 }
1751
1752 for (i, time) in send_times.iter().enumerate() {
1755 let expected_min = i as u64 * 500;
1757 let expected_max = expected_min + 600;
1758
1759 assert!(
1760 *time >= Duration::from_millis(expected_min)
1761 && *time <= Duration::from_millis(expected_max),
1762 "Send {} should be acknowledged at ~{}ms-{}ms, got {:?}",
1763 i + 1,
1764 expected_min,
1765 expected_max,
1766 time
1767 );
1768 }
1769
1770 let mut receive_times = Vec::new();
1772 for i in 0..3 {
1773 let (_, received) = receiver_rx.recv().await.unwrap();
1774 receive_times.push(context.current().duration_since(start).unwrap());
1775 assert_eq!(received[0], i);
1776 }
1777
1778 for (i, time) in receive_times.iter().enumerate() {
1783 let expected_min = (i as u64 * 500) + 1500;
1784 let expected_max = expected_min + 100;
1785
1786 assert!(
1787 *time >= Duration::from_millis(expected_min)
1788 && *time < Duration::from_millis(expected_max),
1789 "Message {} should arrive at ~{}ms, got {:?}",
1790 i + 1,
1791 expected_min,
1792 time
1793 );
1794 }
1795 });
1796 }
1797
1798 #[test]
1799 fn test_dynamic_bandwidth_affects_new_transfers() {
1800 let executor = deterministic::Runner::default();
1803 executor.start(|context| async move {
1804 let (network, mut oracle) = Network::new(
1805 context.with_label("network"),
1806 Config {
1807 max_size: 1024 * 1024,
1808 },
1809 );
1810 network.start();
1811
1812 let pk_sender = PrivateKey::from_seed(1).public_key();
1813 let pk_receiver = PrivateKey::from_seed(2).public_key();
1814
1815 let (mut sender_tx, _) = oracle.register(pk_sender.clone(), 0).await.unwrap();
1817 let (_, mut receiver_rx) = oracle.register(pk_receiver.clone(), 0).await.unwrap();
1818 oracle
1819 .add_link(
1820 pk_sender.clone(),
1821 pk_receiver.clone(),
1822 Link {
1823 latency: Duration::from_millis(1), jitter: Duration::ZERO,
1825 success_rate: 1.0,
1826 },
1827 )
1828 .await
1829 .unwrap();
1830
1831 oracle
1833 .set_bandwidth(pk_sender.clone(), 10_000, usize::MAX)
1834 .await
1835 .unwrap();
1836 oracle
1837 .set_bandwidth(pk_receiver.clone(), usize::MAX, 10_000)
1838 .await
1839 .unwrap();
1840
1841 let msg1 = Bytes::from(vec![1u8; 20_000]); let start_time = context.current();
1844 sender_tx
1845 .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
1846 .await
1847 .unwrap();
1848
1849 let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
1851 let msg1_time = context.current().duration_since(start_time).unwrap();
1852 assert_eq!(received_msg1.len(), 20_000);
1853 assert!(
1854 msg1_time >= Duration::from_millis(1999)
1855 && msg1_time <= Duration::from_millis(2010),
1856 "First message should take ~2s, got {msg1_time:?}",
1857 );
1858
1859 oracle
1861 .set_bandwidth(pk_sender.clone(), 2_000, usize::MAX)
1862 .await
1863 .unwrap();
1864
1865 let msg2 = Bytes::from(vec![2u8; 10_000]); let msg2_start = context.current();
1868 sender_tx
1869 .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
1870 .await
1871 .unwrap();
1872
1873 let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
1875 let msg2_time = context.current().duration_since(msg2_start).unwrap();
1876 assert_eq!(received_msg2.len(), 10_000);
1877 assert!(
1878 msg2_time >= Duration::from_millis(4999)
1879 && msg2_time <= Duration::from_millis(5010),
1880 "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
1881 );
1882 });
1883 }
1884
1885 #[test]
1886 fn test_zero_sender_bandwidth_returns_empty() {
1887 let executor = deterministic::Runner::default();
1889 executor.start(|context| async move {
1890 let (network, mut oracle) = Network::new(
1891 context.with_label("network"),
1892 Config {
1893 max_size: 1024 * 1024,
1894 },
1895 );
1896 network.start();
1897
1898 let pk_sender = PrivateKey::from_seed(1).public_key();
1899 let pk_receiver = PrivateKey::from_seed(2).public_key();
1900
1901 let (sender_tx, _) = oracle.register(pk_sender.clone(), 0).await.unwrap();
1903 let (_, mut receiver_rx) = oracle.register(pk_receiver.clone(), 0).await.unwrap();
1904 oracle
1905 .add_link(
1906 pk_sender.clone(),
1907 pk_receiver.clone(),
1908 Link {
1909 latency: Duration::from_millis(10),
1910 jitter: Duration::ZERO,
1911 success_rate: 1.0,
1912 },
1913 )
1914 .await
1915 .unwrap();
1916
1917 oracle
1919 .set_bandwidth(pk_sender.clone(), 0, usize::MAX)
1920 .await
1921 .unwrap();
1922 oracle
1923 .set_bandwidth(pk_receiver.clone(), usize::MAX, 10_000)
1924 .await
1925 .unwrap();
1926
1927 let msg = Bytes::from(vec![1u8; 1000]);
1928
1929 let mut sender = sender_tx.clone();
1931 let result = sender
1932 .send(Recipients::One(pk_receiver.clone()), msg.clone(), false)
1933 .await
1934 .unwrap();
1935
1936 assert!(result.is_empty());
1938
1939 select! {
1941 _ = receiver_rx.recv() => {
1942 panic!(
1943 "Should not receive message when sender bandwidth is 0"
1944 );
1945 },
1946 _ = context.clone().sleep(Duration::from_secs(2)) => {},
1947 };
1948 });
1949 }
1950
1951 #[test]
1952 fn test_zero_receiver_bandwidth_uses_sender_bandwidth() {
1953 let executor = deterministic::Runner::default();
1955 executor.start(|context| async move {
1956 let (network, mut oracle) = Network::new(
1957 context.with_label("network"),
1958 Config {
1959 max_size: 1024 * 1024,
1960 },
1961 );
1962 network.start();
1963
1964 let pk_sender = PrivateKey::from_seed(1).public_key();
1965 let pk_receiver = PrivateKey::from_seed(2).public_key();
1966
1967 let (mut sender_tx, _) = oracle.register(pk_sender.clone(), 0).await.unwrap();
1969 let (_, mut receiver_rx) = oracle.register(pk_receiver.clone(), 0).await.unwrap();
1970 oracle
1971 .add_link(
1972 pk_sender.clone(),
1973 pk_receiver.clone(),
1974 Link {
1975 latency: Duration::from_millis(10),
1976 jitter: Duration::ZERO,
1977 success_rate: 1.0,
1978 },
1979 )
1980 .await
1981 .unwrap();
1982
1983 oracle
1984 .set_bandwidth(pk_sender.clone(), 10_000, usize::MAX)
1985 .await
1986 .unwrap();
1987 oracle
1988 .set_bandwidth(pk_receiver.clone(), usize::MAX, 0)
1989 .await
1990 .unwrap();
1991
1992 let msg2 = Bytes::from(vec![2u8; 10_000]); let start2 = context.current();
1994 let sent = sender_tx
1995 .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
1996 .await
1997 .unwrap();
1998 let send_time = context.current().duration_since(start2).unwrap();
1999
2000 assert_eq!(sent.len(), 1);
2001 assert_eq!(sent[0], pk_receiver);
2002 assert!(
2004 send_time >= Duration::from_millis(999) && send_time <= Duration::from_millis(1010),
2005 "With receiver bandwidth 0, should still use sender bandwidth (~1s), got {send_time:?}",
2006 );
2007
2008 select! {
2011 _ = receiver_rx.recv() => {
2012 panic!(
2013 "Should not receive message when receiver bandwidth is 0"
2014 );
2015 },
2016 _ = context.clone().sleep(Duration::from_secs(2)) => {},
2017 };
2018 });
2019 }
2020}