1mod bandwidth;
139mod ingress;
140mod metrics;
141mod network;
142mod transmitter;
143
144use thiserror::Error;
145
146#[derive(Debug, Error)]
148pub enum Error {
149 #[error("message too large: {0}")]
150 MessageTooLarge(usize),
151 #[error("network closed")]
152 NetworkClosed,
153 #[error("not valid to link self")]
154 LinkingSelf,
155 #[error("link already exists")]
156 LinkExists,
157 #[error("link missing")]
158 LinkMissing,
159 #[error("invalid success rate (must be in [0, 1]): {0}")]
160 InvalidSuccessRate(f64),
161 #[error("channel already registered: {0}")]
162 ChannelAlreadyRegistered(u64),
163 #[error("send_frame failed")]
164 SendFrameFailed,
165 #[error("recv_frame failed")]
166 RecvFrameFailed,
167 #[error("bind failed")]
168 BindFailed,
169 #[error("accept failed")]
170 AcceptFailed,
171 #[error("dial failed")]
172 DialFailed,
173 #[error("peer missing")]
174 PeerMissing,
175}
176
177pub use ingress::{Link, Oracle};
178pub use network::{Config, Network, Receiver, Sender};
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::{Manager, Receiver, Recipients, Sender};
184 use bytes::Bytes;
185 use commonware_cryptography::{
186 ed25519::{self, PrivateKey, PublicKey},
187 PrivateKeyExt as _, Signer as _,
188 };
189 use commonware_macros::select;
190 use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
191 use futures::{channel::mpsc, SinkExt, StreamExt};
192 use rand::Rng;
193 use std::{
194 collections::{BTreeMap, HashMap, HashSet},
195 time::Duration,
196 };
197
198 fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
199 let executor = deterministic::Runner::seeded(seed);
200 executor.start(|context| async move {
201 let (network, mut oracle) = Network::new(
203 context.with_label("network"),
204 Config {
205 max_size: 1024 * 1024,
206 disconnect_on_block: true,
207 tracked_peer_sets: None,
208 },
209 );
210
211 network.start();
213
214 let mut agents = BTreeMap::new();
216 let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
217 for i in 0..size {
218 let pk = PrivateKey::from_seed(i as u64).public_key();
219 let (sender, mut receiver) = oracle.control(pk.clone()).register(0).await.unwrap();
220 agents.insert(pk, sender);
221 let mut agent_sender = seen_sender.clone();
222 context
223 .with_label("agent_receiver")
224 .spawn(move |_| async move {
225 for _ in 0..size {
226 receiver.recv().await.unwrap();
227 }
228 agent_sender.send(i).await.unwrap();
229
230 });
232 }
233
234 let only_inbound = PrivateKey::from_seed(0).public_key();
236 for agent in agents.keys() {
237 if agent == &only_inbound {
238 continue;
240 }
241 for other in agents.keys() {
242 let result = oracle
243 .add_link(
244 agent.clone(),
245 other.clone(),
246 Link {
247 latency: Duration::from_millis(5),
248 jitter: Duration::from_millis(2),
249 success_rate: 0.75,
250 },
251 )
252 .await;
253 if agent == other {
254 assert!(matches!(result, Err(Error::LinkingSelf)));
255 } else {
256 assert!(result.is_ok());
257 }
258 }
259 }
260
261 context
263 .with_label("agent_sender")
264 .spawn(|mut context| async move {
265 let keys = agents.keys().collect::<Vec<_>>();
267
268 loop {
270 let index = context.gen_range(0..keys.len());
271 let sender = keys[index];
272 let msg = format!("hello from {sender:?}");
273 let msg = Bytes::from(msg);
274 let mut message_sender = agents.get(sender).unwrap().clone();
275 let sent = message_sender
276 .send(Recipients::All, msg.clone(), false)
277 .await
278 .unwrap();
279 if sender == &only_inbound {
280 assert_eq!(sent.len(), 0);
281 } else {
282 assert_eq!(sent.len(), keys.len() - 1);
283 }
284 }
285 });
286
287 let mut results = Vec::new();
289 for _ in 0..size {
290 results.push(seen_receiver.next().await.unwrap());
291 }
292 (context.auditor().state(), results)
293 })
294 }
295
296 fn compare_outputs(seeds: u64, size: usize) {
297 let mut outputs = Vec::new();
299 for seed in 0..seeds {
300 outputs.push(simulate_messages(seed, size));
301 }
302
303 for seed in 0..seeds {
305 let output = simulate_messages(seed, size);
306 assert_eq!(output, outputs[seed as usize]);
307 }
308 }
309
310 #[test]
311 fn test_determinism() {
312 compare_outputs(25, 25);
313 }
314
315 #[test]
316 fn test_message_too_big() {
317 let executor = deterministic::Runner::default();
318 executor.start(|mut context| async move {
319 let (network, oracle) = Network::new(
321 context.with_label("network"),
322 Config {
323 max_size: 1024 * 1024,
324 disconnect_on_block: true,
325 tracked_peer_sets: None,
326 },
327 );
328
329 network.start();
331
332 let mut agents = HashMap::new();
334 for i in 0..10 {
335 let pk = PrivateKey::from_seed(i as u64).public_key();
336 let (sender, _) = oracle.control(pk.clone()).register(0).await.unwrap();
337 agents.insert(pk, sender);
338 }
339
340 let keys = agents.keys().collect::<Vec<_>>();
342 let index = context.gen_range(0..keys.len());
343 let sender = keys[index];
344 let mut message_sender = agents.get(sender).unwrap().clone();
345 let mut msg = vec![0u8; 1024 * 1024 + 1];
346 context.fill(&mut msg[..]);
347 let result = message_sender
348 .send(Recipients::All, msg.into(), false)
349 .await
350 .unwrap_err();
351
352 assert!(matches!(result, Error::MessageTooLarge(_)));
354 });
355 }
356
357 #[test]
358 fn test_linking_self() {
359 let executor = deterministic::Runner::default();
360 executor.start(|context| async move {
361 let (network, mut oracle) = Network::new(
363 context.with_label("network"),
364 Config {
365 max_size: 1024 * 1024,
366 disconnect_on_block: true,
367 tracked_peer_sets: None,
368 },
369 );
370
371 network.start();
373
374 let pk = PrivateKey::from_seed(0).public_key();
376 oracle.control(pk.clone()).register(0).await.unwrap();
377
378 let result = oracle
380 .add_link(
381 pk.clone(),
382 pk,
383 Link {
384 latency: Duration::from_millis(5),
385 jitter: Duration::from_millis(2),
386 success_rate: 0.75,
387 },
388 )
389 .await;
390
391 assert!(matches!(result, Err(Error::LinkingSelf)));
393 });
394 }
395
396 #[test]
397 fn test_duplicate_channel() {
398 let executor = deterministic::Runner::default();
399 executor.start(|context| async move {
400 let (network, oracle) = Network::new(
402 context.with_label("network"),
403 Config {
404 max_size: 1024 * 1024,
405 disconnect_on_block: true,
406 tracked_peer_sets: None,
407 },
408 );
409
410 network.start();
412
413 let pk = PrivateKey::from_seed(0).public_key();
415 oracle.control(pk.clone()).register(0).await.unwrap();
416 let result = oracle.control(pk.clone()).register(0).await;
417
418 assert!(matches!(result, Err(Error::ChannelAlreadyRegistered(0))));
420 });
421 }
422
423 #[test]
424 fn test_invalid_success_rate() {
425 let executor = deterministic::Runner::default();
426 executor.start(|context| async move {
427 let (network, mut oracle) = Network::new(
429 context.with_label("network"),
430 Config {
431 max_size: 1024 * 1024,
432 disconnect_on_block: true,
433 tracked_peer_sets: None,
434 },
435 );
436
437 network.start();
439
440 let pk1 = PrivateKey::from_seed(0).public_key();
442 let pk2 = PrivateKey::from_seed(1).public_key();
443 oracle.control(pk1.clone()).register(0).await.unwrap();
444 oracle.control(pk2.clone()).register(0).await.unwrap();
445
446 let result = oracle
448 .add_link(
449 pk1,
450 pk2,
451 Link {
452 latency: Duration::from_millis(5),
453 jitter: Duration::from_millis(2),
454 success_rate: 1.5,
455 },
456 )
457 .await;
458
459 assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
461 });
462 }
463
464 #[test]
465 fn test_simple_message_delivery() {
466 let executor = deterministic::Runner::default();
467 executor.start(|context| async move {
468 let (network, mut oracle) = Network::new(
470 context.with_label("network"),
471 Config {
472 max_size: 1024 * 1024,
473 disconnect_on_block: true,
474 tracked_peer_sets: None,
475 },
476 );
477
478 network.start();
480
481 let pk1 = PrivateKey::from_seed(0).public_key();
483 let pk2 = PrivateKey::from_seed(1).public_key();
484 let (mut sender1, mut receiver1) =
485 oracle.control(pk1.clone()).register(0).await.unwrap();
486 let (mut sender2, mut receiver2) =
487 oracle.control(pk2.clone()).register(0).await.unwrap();
488
489 let _ = oracle.control(pk1.clone()).register(1).await.unwrap();
491 let _ = oracle.control(pk2.clone()).register(2).await.unwrap();
492
493 oracle
495 .add_link(
496 pk1.clone(),
497 pk2.clone(),
498 Link {
499 latency: Duration::from_millis(5),
500 jitter: Duration::from_millis(2),
501 success_rate: 1.0,
502 },
503 )
504 .await
505 .unwrap();
506 oracle
507 .add_link(
508 pk2.clone(),
509 pk1.clone(),
510 Link {
511 latency: Duration::from_millis(5),
512 jitter: Duration::from_millis(2),
513 success_rate: 1.0,
514 },
515 )
516 .await
517 .unwrap();
518
519 let msg1 = Bytes::from("hello from pk1");
521 let msg2 = Bytes::from("hello from pk2");
522 sender1
523 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
524 .await
525 .unwrap();
526 sender2
527 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
528 .await
529 .unwrap();
530
531 let (sender, message) = receiver1.recv().await.unwrap();
533 assert_eq!(sender, pk2);
534 assert_eq!(message, msg2);
535 let (sender, message) = receiver2.recv().await.unwrap();
536 assert_eq!(sender, pk1);
537 assert_eq!(message, msg1);
538 });
539 }
540
541 #[test]
542 fn test_send_wrong_channel() {
543 let executor = deterministic::Runner::default();
544 executor.start(|context| async move {
545 let (network, mut oracle) = Network::new(
547 context.with_label("network"),
548 Config {
549 max_size: 1024 * 1024,
550 disconnect_on_block: true,
551 tracked_peer_sets: None,
552 },
553 );
554
555 network.start();
557
558 let pk1 = PrivateKey::from_seed(0).public_key();
560 let pk2 = PrivateKey::from_seed(1).public_key();
561 let (mut sender1, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
562 let (_, mut receiver2) = oracle.control(pk2.clone()).register(1).await.unwrap();
563
564 oracle
566 .add_link(
567 pk1,
568 pk2.clone(),
569 Link {
570 latency: Duration::from_millis(5),
571 jitter: Duration::ZERO,
572 success_rate: 1.0,
573 },
574 )
575 .await
576 .unwrap();
577
578 let msg = Bytes::from("hello from pk1");
580 sender1
581 .send(Recipients::One(pk2), msg, false)
582 .await
583 .unwrap();
584
585 select! {
587 _ = receiver2.recv() => {
588 panic!("unexpected message");
589 },
590 _ = context.sleep(Duration::from_secs(1)) => {},
591 }
592 });
593 }
594
595 #[test]
596 fn test_dynamic_peers() {
597 let executor = deterministic::Runner::default();
598 executor.start(|context| async move {
599 let (network, mut oracle) = Network::new(
601 context.with_label("network"),
602 Config {
603 max_size: 1024 * 1024,
604 disconnect_on_block: true,
605 tracked_peer_sets: None,
606 },
607 );
608
609 network.start();
611
612 let pk1 = PrivateKey::from_seed(0).public_key();
614 let pk2 = PrivateKey::from_seed(1).public_key();
615 let (mut sender1, mut receiver1) =
616 oracle.control(pk1.clone()).register(0).await.unwrap();
617 let (mut sender2, mut receiver2) =
618 oracle.control(pk2.clone()).register(0).await.unwrap();
619
620 oracle
622 .add_link(
623 pk1.clone(),
624 pk2.clone(),
625 Link {
626 latency: Duration::from_millis(5),
627 jitter: Duration::from_millis(2),
628 success_rate: 1.0,
629 },
630 )
631 .await
632 .unwrap();
633 oracle
634 .add_link(
635 pk2.clone(),
636 pk1.clone(),
637 Link {
638 latency: Duration::from_millis(5),
639 jitter: Duration::from_millis(2),
640 success_rate: 1.0,
641 },
642 )
643 .await
644 .unwrap();
645
646 let msg1 = Bytes::from("attempt 1: hello from pk1");
648 let msg2 = Bytes::from("attempt 1: hello from pk2");
649 sender1
650 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
651 .await
652 .unwrap();
653 sender2
654 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
655 .await
656 .unwrap();
657
658 let (sender, message) = receiver1.recv().await.unwrap();
660 assert_eq!(sender, pk2);
661 assert_eq!(message, msg2);
662 let (sender, message) = receiver2.recv().await.unwrap();
663 assert_eq!(sender, pk1);
664 assert_eq!(message, msg1);
665 });
666 }
667
668 #[test]
669 fn test_dynamic_links() {
670 let executor = deterministic::Runner::default();
671 executor.start(|context| async move {
672 let (network, mut oracle) = Network::new(
674 context.with_label("network"),
675 Config {
676 max_size: 1024 * 1024,
677 disconnect_on_block: true,
678 tracked_peer_sets: None,
679 },
680 );
681
682 network.start();
684
685 let pk1 = PrivateKey::from_seed(0).public_key();
687 let pk2 = PrivateKey::from_seed(1).public_key();
688 let (mut sender1, mut receiver1) =
689 oracle.control(pk1.clone()).register(0).await.unwrap();
690 let (mut sender2, mut receiver2) =
691 oracle.control(pk2.clone()).register(0).await.unwrap();
692
693 let msg1 = Bytes::from("attempt 1: hello from pk1");
695 let msg2 = Bytes::from("attempt 1: hello from pk2");
696 sender1
697 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
698 .await
699 .unwrap();
700 sender2
701 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
702 .await
703 .unwrap();
704
705 select! {
707 _ = receiver1.recv() => {
708 panic!("unexpected message");
709 },
710 _ = receiver2.recv() => {
711 panic!("unexpected message");
712 },
713 _ = context.sleep(Duration::from_secs(1)) => {},
714 }
715
716 oracle
718 .add_link(
719 pk1.clone(),
720 pk2.clone(),
721 Link {
722 latency: Duration::from_millis(5),
723 jitter: Duration::from_millis(2),
724 success_rate: 1.0,
725 },
726 )
727 .await
728 .unwrap();
729 oracle
730 .add_link(
731 pk2.clone(),
732 pk1.clone(),
733 Link {
734 latency: Duration::from_millis(5),
735 jitter: Duration::from_millis(2),
736 success_rate: 1.0,
737 },
738 )
739 .await
740 .unwrap();
741
742 let msg1 = Bytes::from("attempt 2: hello from pk1");
744 let msg2 = Bytes::from("attempt 2: hello from pk2");
745 sender1
746 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
747 .await
748 .unwrap();
749 sender2
750 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
751 .await
752 .unwrap();
753
754 let (sender, message) = receiver1.recv().await.unwrap();
756 assert_eq!(sender, pk2);
757 assert_eq!(message, msg2);
758 let (sender, message) = receiver2.recv().await.unwrap();
759 assert_eq!(sender, pk1);
760 assert_eq!(message, msg1);
761
762 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
764 oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
765
766 let msg1 = Bytes::from("attempt 3: hello from pk1");
768 let msg2 = Bytes::from("attempt 3: hello from pk2");
769 sender1
770 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
771 .await
772 .unwrap();
773 sender2
774 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
775 .await
776 .unwrap();
777
778 select! {
780 _ = receiver1.recv() => {
781 panic!("unexpected message");
782 },
783 _ = receiver2.recv() => {
784 panic!("unexpected message");
785 },
786 _ = context.sleep(Duration::from_secs(1)) => {},
787 }
788
789 let result = oracle.remove_link(pk1, pk2).await;
791 assert!(matches!(result, Err(Error::LinkMissing)));
792 });
793 }
794
795 async fn test_bandwidth_between_peers(
796 context: &mut deterministic::Context,
797 oracle: &mut Oracle<PublicKey>,
798 sender_bps: Option<usize>,
799 receiver_bps: Option<usize>,
800 message_size: usize,
801 expected_duration_ms: u64,
802 ) {
803 let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
805 let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
806 let (mut sender, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
807 let (_, mut receiver) = oracle.control(pk2.clone()).register(0).await.unwrap();
808
809 oracle
811 .limit_bandwidth(pk1.clone(), sender_bps, None)
812 .await
813 .unwrap();
814 oracle
815 .limit_bandwidth(pk2.clone(), None, receiver_bps)
816 .await
817 .unwrap();
818
819 oracle
821 .add_link(
822 pk1.clone(),
823 pk2.clone(),
824 Link {
825 latency: Duration::ZERO,
827 jitter: Duration::ZERO,
828 success_rate: 1.0,
829 },
830 )
831 .await
832 .unwrap();
833
834 let msg = Bytes::from(vec![42u8; message_size]);
836 let start = context.current();
837 sender
838 .send(Recipients::One(pk2.clone()), msg.clone(), true)
839 .await
840 .unwrap();
841
842 let (origin, received) = receiver.recv().await.unwrap();
844 let elapsed = context.current().duration_since(start).unwrap();
845
846 assert_eq!(origin, pk1);
847 assert_eq!(received, msg);
848 assert!(
849 elapsed >= Duration::from_millis(expected_duration_ms),
850 "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
851 );
852 assert!(
853 elapsed < Duration::from_millis(expected_duration_ms + 100),
854 "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
855 );
856 }
857
858 #[test]
859 fn test_bandwidth() {
860 let executor = deterministic::Runner::default();
861 executor.start(|mut context| async move {
862 let (network, mut oracle) = Network::new(
863 context.with_label("network"),
864 Config {
865 max_size: 1024 * 1024,
866 disconnect_on_block: true,
867 tracked_peer_sets: None,
868 },
869 );
870 network.start();
871
872 test_bandwidth_between_peers(
875 &mut context,
876 &mut oracle,
877 Some(1000), Some(1000), 500, 500, )
882 .await;
883
884 test_bandwidth_between_peers(
888 &mut context,
889 &mut oracle,
890 Some(500), Some(2000), 250, 500, )
895 .await;
896
897 test_bandwidth_between_peers(
901 &mut context,
902 &mut oracle,
903 Some(2000), Some(500), 250, 500, )
908 .await;
909
910 test_bandwidth_between_peers(
914 &mut context,
915 &mut oracle,
916 None, Some(1000), 500, 500, )
921 .await;
922
923 test_bandwidth_between_peers(
927 &mut context,
928 &mut oracle,
929 Some(1000), None, 500, 500, )
934 .await;
935
936 test_bandwidth_between_peers(
939 &mut context,
940 &mut oracle,
941 None, None, 500, 0, )
946 .await;
947 });
948 }
949
950 #[test]
951 fn test_bandwidth_contention() {
952 let executor = deterministic::Runner::default();
954 executor.start(|context| async move {
955 let (network, mut oracle) = Network::new(
956 context.with_label("network"),
957 Config {
958 max_size: 1024 * 1024,
959 disconnect_on_block: true,
960 tracked_peer_sets: None,
961 },
962 );
963 network.start();
964
965 const NUM_PEERS: usize = 100;
967 const MESSAGE_SIZE: usize = 1000; const EFFECTIVE_BPS: usize = 10_000; let mut peers = Vec::with_capacity(NUM_PEERS + 1);
972 let mut senders = Vec::with_capacity(NUM_PEERS + 1);
973 let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
974
975 for i in 0..=NUM_PEERS {
977 let pk = PrivateKey::from_seed(i as u64).public_key();
978 let (sender, receiver) = oracle.control(pk.clone()).register(0).await.unwrap();
979 peers.push(pk);
980 senders.push(sender);
981 receivers.push(receiver);
982 }
983
984 for pk in &peers {
986 oracle
987 .limit_bandwidth(pk.clone(), Some(EFFECTIVE_BPS), Some(EFFECTIVE_BPS))
988 .await
989 .unwrap();
990 }
991
992 for peer in peers.iter().skip(1) {
994 oracle
995 .add_link(
996 peer.clone(),
997 peers[0].clone(),
998 Link {
999 latency: Duration::ZERO,
1000 jitter: Duration::ZERO,
1001 success_rate: 1.0,
1002 },
1003 )
1004 .await
1005 .unwrap();
1006 oracle
1007 .add_link(
1008 peers[0].clone(),
1009 peer.clone(),
1010 Link {
1011 latency: Duration::ZERO,
1012 jitter: Duration::ZERO,
1013 success_rate: 1.0,
1014 },
1015 )
1016 .await
1017 .unwrap();
1018 }
1019
1020 let start = context.current();
1023
1024 let msg = Bytes::from(vec![0u8; MESSAGE_SIZE]);
1027 for peer in peers.iter().skip(1) {
1028 senders[0]
1029 .send(Recipients::One(peer.clone()), msg.clone(), true)
1030 .await
1031 .unwrap();
1032 }
1033
1034 for receiver in receivers.iter_mut().skip(1) {
1036 let (origin, received) = receiver.recv().await.unwrap();
1037 assert_eq!(origin, peers[0]);
1038 assert_eq!(received.len(), MESSAGE_SIZE);
1039 }
1040
1041 let elapsed = context.current().duration_since(start).unwrap();
1042
1043 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1045
1046 assert!(
1047 elapsed >= Duration::from_millis(expected_ms as u64),
1048 "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1049 );
1050 assert!(
1051 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1052 "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1053 );
1054
1055 let start = context.current();
1057
1058 let msg = Bytes::from(vec![0; MESSAGE_SIZE]);
1061 for mut sender in senders.into_iter().skip(1) {
1062 sender
1063 .send(Recipients::One(peers[0].clone()), msg.clone(), true)
1064 .await
1065 .unwrap();
1066 }
1067
1068 let mut received_from = HashSet::new();
1070 for _ in 1..=NUM_PEERS {
1071 let (origin, received) = receivers[0].recv().await.unwrap();
1072 assert_eq!(received.len(), MESSAGE_SIZE);
1073 assert!(
1074 received_from.insert(origin.clone()),
1075 "Received duplicate from {origin:?}"
1076 );
1077 }
1078
1079 let elapsed = context.current().duration_since(start).unwrap();
1080
1081 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1083
1084 assert!(
1085 elapsed >= Duration::from_millis(expected_ms as u64),
1086 "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1087 );
1088 assert!(
1089 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1090 "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1091 );
1092
1093 assert_eq!(received_from.len(), NUM_PEERS);
1095 for peer in peers.iter().skip(1) {
1096 assert!(received_from.contains(peer));
1097 }
1098 });
1099 }
1100
1101 #[test]
1102 fn test_message_ordering() {
1103 let executor = deterministic::Runner::default();
1105 executor.start(|context| async move {
1106 let (network, mut oracle) = Network::new(
1107 context.with_label("network"),
1108 Config {
1109 max_size: 1024 * 1024,
1110 disconnect_on_block: true,
1111 tracked_peer_sets: None,
1112 },
1113 );
1114 network.start();
1115
1116 let pk1 = PrivateKey::from_seed(1).public_key();
1118 let pk2 = PrivateKey::from_seed(2).public_key();
1119 let (mut sender, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
1120 let (_, mut receiver) = oracle.control(pk2.clone()).register(0).await.unwrap();
1121
1122 oracle
1124 .add_link(
1125 pk1.clone(),
1126 pk2.clone(),
1127 Link {
1128 latency: Duration::from_millis(50),
1129 jitter: Duration::from_millis(40),
1130 success_rate: 1.0,
1131 },
1132 )
1133 .await
1134 .unwrap();
1135
1136 let messages = vec![
1138 Bytes::from("message 1"),
1139 Bytes::from("message 2"),
1140 Bytes::from("message 3"),
1141 Bytes::from("message 4"),
1142 Bytes::from("message 5"),
1143 ];
1144
1145 for msg in messages.clone() {
1146 sender
1147 .send(Recipients::One(pk2.clone()), msg, true)
1148 .await
1149 .unwrap();
1150 }
1151
1152 for expected_msg in messages {
1154 let (origin, received_msg) = receiver.recv().await.unwrap();
1155 assert_eq!(origin, pk1);
1156 assert_eq!(received_msg, expected_msg);
1157 }
1158 })
1159 }
1160
1161 #[test]
1162 fn test_high_latency_message_blocks_followup() {
1163 let executor = deterministic::Runner::default();
1164 executor.start(|context| async move {
1165 let (network, mut oracle) = Network::new(
1166 context.with_label("network"),
1167 Config {
1168 max_size: 1024 * 1024,
1169 disconnect_on_block: true,
1170 tracked_peer_sets: None,
1171 },
1172 );
1173 network.start();
1174
1175 let pk1 = PrivateKey::from_seed(1).public_key();
1176 let pk2 = PrivateKey::from_seed(2).public_key();
1177 let (mut sender, _) = oracle.control(pk1.clone()).register(0).await.unwrap();
1178 let (_, mut receiver) = oracle.control(pk2.clone()).register(0).await.unwrap();
1179
1180 const BPS: usize = 1_000;
1181 oracle
1182 .limit_bandwidth(pk1.clone(), Some(BPS), None)
1183 .await
1184 .unwrap();
1185 oracle
1186 .limit_bandwidth(pk2.clone(), None, Some(BPS))
1187 .await
1188 .unwrap();
1189
1190 oracle
1192 .add_link(
1193 pk1.clone(),
1194 pk2.clone(),
1195 Link {
1196 latency: Duration::from_millis(5_000),
1197 jitter: Duration::ZERO,
1198 success_rate: 1.0,
1199 },
1200 )
1201 .await
1202 .unwrap();
1203
1204 let slow = Bytes::from(vec![0u8; 1_000]);
1205 sender
1206 .send(Recipients::One(pk2.clone()), slow.clone(), true)
1207 .await
1208 .unwrap();
1209
1210 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
1212 oracle
1213 .add_link(
1214 pk1.clone(),
1215 pk2.clone(),
1216 Link {
1217 latency: Duration::from_millis(1),
1218 jitter: Duration::ZERO,
1219 success_rate: 1.0,
1220 },
1221 )
1222 .await
1223 .unwrap();
1224
1225 let fast = Bytes::from(vec![1u8; 1_000]);
1227 sender
1228 .send(Recipients::One(pk2.clone()), fast.clone(), true)
1229 .await
1230 .unwrap();
1231
1232 let start = context.current();
1233 let (origin1, message1) = receiver.recv().await.unwrap();
1234 assert_eq!(origin1, pk1);
1235 assert_eq!(message1, slow);
1236 let first_elapsed = context.current().duration_since(start).unwrap();
1237
1238 let (origin2, message2) = receiver.recv().await.unwrap();
1239 let second_elapsed = context.current().duration_since(start).unwrap();
1240 assert_eq!(origin2, pk1);
1241 assert_eq!(message2, fast);
1242
1243 let egress_time = Duration::from_secs(1);
1244 let slow_latency = Duration::from_millis(5_000);
1245 let expected_first = egress_time + slow_latency;
1246 let tolerance = Duration::from_millis(10);
1247 assert!(
1248 first_elapsed >= expected_first.saturating_sub(tolerance)
1249 && first_elapsed <= expected_first + tolerance,
1250 "slow message arrived outside expected window: {first_elapsed:?} (expected {expected_first:?} ± {tolerance:?})"
1251 );
1252 assert!(
1253 second_elapsed >= first_elapsed,
1254 "fast message arrived before slow transmission completed"
1255 );
1256
1257 let arrival_gap = second_elapsed
1258 .checked_sub(first_elapsed)
1259 .expect("timestamps ordered");
1260 assert!(
1261 arrival_gap >= egress_time.saturating_sub(tolerance)
1262 && arrival_gap <= egress_time + tolerance,
1263 "next arrival deviated from transmit duration (gap = {arrival_gap:?}, expected {egress_time:?} ± {tolerance:?})"
1264 );
1265 })
1266 }
1267
1268 #[test]
1269 fn test_many_to_one_bandwidth_sharing() {
1270 let executor = deterministic::Runner::default();
1271 executor.start(|context| async move {
1272 let (network, mut oracle) = Network::new(
1273 context.with_label("network"),
1274 Config {
1275 max_size: 1024 * 1024,
1276 disconnect_on_block: true,
1277 tracked_peer_sets: None,
1278 },
1279 );
1280 network.start();
1281
1282 let mut senders = Vec::new();
1284 let mut sender_txs = Vec::new();
1285 for i in 0..10 {
1286 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1287 senders.push(sender.clone());
1288 let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1289 sender_txs.push(tx);
1290
1291 oracle
1293 .limit_bandwidth(sender.clone(), Some(10_000), None)
1294 .await
1295 .unwrap();
1296 }
1297
1298 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1299 let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1300
1301 oracle
1303 .limit_bandwidth(receiver.clone(), None, Some(100_000))
1304 .await
1305 .unwrap();
1306
1307 for sender in &senders {
1309 oracle
1310 .add_link(
1311 sender.clone(),
1312 receiver.clone(),
1313 Link {
1314 latency: Duration::ZERO,
1315 jitter: Duration::ZERO,
1316 success_rate: 1.0,
1317 },
1318 )
1319 .await
1320 .unwrap();
1321 }
1322
1323 let start = context.current();
1324
1325 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1327 let receiver_clone = receiver.clone();
1328 let msg = Bytes::from(vec![i as u8; 10_000]);
1329 tx.send(Recipients::One(receiver_clone), msg, true)
1330 .await
1331 .unwrap();
1332 }
1333
1334 for i in 0..10 {
1337 let (_, _msg) = receiver_rx.recv().await.unwrap();
1338 let recv_time = context.current().duration_since(start).unwrap();
1339
1340 assert!(
1342 recv_time >= Duration::from_millis(950)
1343 && recv_time <= Duration::from_millis(1100),
1344 "Message {i} received at {recv_time:?}, expected ~1s",
1345 );
1346 }
1347 });
1348 }
1349
1350 #[test]
1351 fn test_one_to_many_fast_sender() {
1352 let executor = deterministic::Runner::default();
1355 executor.start(|context| async move {
1356 let (network, mut oracle) = Network::new(
1357 context.with_label("network"),
1358 Config {
1359 max_size: 1024 * 1024,
1360 disconnect_on_block: true,
1361 tracked_peer_sets: None,
1362 },
1363 );
1364 network.start();
1365
1366 let sender = ed25519::PrivateKey::from_seed(0).public_key();
1368 let (sender_tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1369
1370 oracle
1372 .limit_bandwidth(sender.clone(), Some(100_000), None)
1373 .await
1374 .unwrap();
1375
1376 let mut receivers = Vec::new();
1378 let mut receiver_rxs = Vec::new();
1379 for i in 0..10 {
1380 let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1381 receivers.push(receiver.clone());
1382 let (_, rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1383 receiver_rxs.push(rx);
1384
1385 oracle
1387 .limit_bandwidth(receiver.clone(), None, Some(10_000))
1388 .await
1389 .unwrap();
1390
1391 oracle
1393 .add_link(
1394 sender.clone(),
1395 receiver.clone(),
1396 Link {
1397 latency: Duration::ZERO,
1398 jitter: Duration::ZERO,
1399 success_rate: 1.0,
1400 },
1401 )
1402 .await
1403 .unwrap();
1404 }
1405
1406 let start = context.current();
1407
1408 for (i, receiver) in receivers.iter().enumerate() {
1410 let mut sender_tx = sender_tx.clone();
1411 let receiver_clone = receiver.clone();
1412 let msg = Bytes::from(vec![i as u8; 10_000]);
1413 sender_tx
1414 .send(Recipients::One(receiver_clone), msg, true)
1415 .await
1416 .unwrap();
1417 }
1418
1419 for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1421 let (_, msg) = rx.recv().await.unwrap();
1422 assert_eq!(msg[0], i as u8);
1423 let recv_time = context.current().duration_since(start).unwrap();
1424
1425 assert!(
1427 recv_time >= Duration::from_millis(950)
1428 && recv_time <= Duration::from_millis(1100),
1429 "Receiver {i} received at {recv_time:?}, expected ~1s",
1430 );
1431 }
1432 });
1433 }
1434
1435 #[test]
1436 fn test_many_slow_senders_to_fast_receiver() {
1437 let executor = deterministic::Runner::default();
1440 executor.start(|context| async move {
1441 let (network, mut oracle) = Network::new(
1442 context.with_label("network"),
1443 Config {
1444 max_size: 1024 * 1024,
1445 disconnect_on_block: true,
1446 tracked_peer_sets: None,
1447 },
1448 );
1449 network.start();
1450
1451 let mut senders = Vec::new();
1453 let mut sender_txs = Vec::new();
1454 for i in 0..10 {
1455 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1456 senders.push(sender.clone());
1457 let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1458 sender_txs.push(tx);
1459
1460 oracle
1462 .limit_bandwidth(sender.clone(), Some(1_000), None)
1463 .await
1464 .unwrap();
1465 }
1466
1467 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1469 let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1470
1471 oracle
1473 .limit_bandwidth(receiver.clone(), None, Some(10_000))
1474 .await
1475 .unwrap();
1476
1477 for sender in &senders {
1479 oracle
1480 .add_link(
1481 sender.clone(),
1482 receiver.clone(),
1483 Link {
1484 latency: Duration::ZERO,
1485 jitter: Duration::ZERO,
1486 success_rate: 1.0,
1487 },
1488 )
1489 .await
1490 .unwrap();
1491 }
1492
1493 let start = context.current();
1494
1495 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1497 let receiver_clone = receiver.clone();
1498 let msg = Bytes::from(vec![i as u8; 1_000]);
1499 tx.send(Recipients::One(receiver_clone), msg, true)
1500 .await
1501 .unwrap();
1502 }
1503
1504 for i in 0..10 {
1510 let (_, _msg) = receiver_rx.recv().await.unwrap();
1511 let recv_time = context.current().duration_since(start).unwrap();
1512
1513 assert!(
1515 recv_time >= Duration::from_millis(950)
1516 && recv_time <= Duration::from_millis(1100),
1517 "Message {i} received at {recv_time:?}, expected ~1s",
1518 );
1519 }
1520 });
1521 }
1522
1523 #[test]
1524 fn test_dynamic_bandwidth_allocation_staggered() {
1525 let executor = deterministic::Runner::default();
1531 executor.start(|context| async move {
1532 let (network, mut oracle) = Network::new(
1533 context.with_label("network"),
1534 Config {
1535 max_size: 1024 * 1024,
1536 disconnect_on_block: true,
1537 tracked_peer_sets: None,
1538 },
1539 );
1540 network.start();
1541
1542 let mut senders = Vec::new();
1544 let mut sender_txs = Vec::new();
1545 for i in 0..3 {
1546 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1547 senders.push(sender.clone());
1548 let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1549 sender_txs.push(tx);
1550
1551 oracle
1553 .limit_bandwidth(sender.clone(), Some(30_000), None)
1554 .await
1555 .unwrap();
1556 }
1557
1558 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1560 let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1561 oracle
1562 .limit_bandwidth(receiver.clone(), None, Some(30_000))
1563 .await
1564 .unwrap();
1565
1566 for sender in &senders {
1568 oracle
1569 .add_link(
1570 sender.clone(),
1571 receiver.clone(),
1572 Link {
1573 latency: Duration::from_millis(1),
1574 jitter: Duration::ZERO,
1575 success_rate: 1.0,
1576 },
1577 )
1578 .await
1579 .unwrap();
1580 }
1581
1582 let start = context.current();
1583
1584 let mut tx0 = sender_txs[0].clone();
1588 let rx_clone = receiver.clone();
1589 context.clone().spawn(move |_| async move {
1590 let msg = Bytes::from(vec![0u8; 30_000]);
1591 tx0.send(Recipients::One(rx_clone), msg, true)
1592 .await
1593 .unwrap();
1594 });
1595
1596 let mut tx1 = sender_txs[1].clone();
1600 let rx_clone = receiver.clone();
1601 context.clone().spawn(move |context| async move {
1602 context.sleep(Duration::from_millis(500)).await;
1603 let msg = Bytes::from(vec![1u8; 30_000]);
1604 tx1.send(Recipients::One(rx_clone), msg, true)
1605 .await
1606 .unwrap();
1607 });
1608
1609 let mut tx2 = sender_txs[2].clone();
1612 let rx_clone = receiver.clone();
1613 context.clone().spawn(move |context| async move {
1614 context.sleep(Duration::from_millis(1500)).await;
1615 let msg = Bytes::from(vec![2u8; 15_000]);
1616 tx2.send(Recipients::One(rx_clone), msg, true)
1617 .await
1618 .unwrap();
1619 });
1620
1621 let (_, msg0) = receiver_rx.recv().await.unwrap();
1625 assert_eq!(msg0[0], 0);
1626 let t0 = context.current().duration_since(start).unwrap();
1627 assert!(
1628 t0 >= Duration::from_millis(1490) && t0 <= Duration::from_millis(1600),
1629 "Message 0 received at {t0:?}, expected ~1.5s",
1630 );
1631
1632 let (_, msg_a) = receiver_rx.recv().await.unwrap();
1636 let t_a = context.current().duration_since(start).unwrap();
1637
1638 let (_, msg_b) = receiver_rx.recv().await.unwrap();
1639 let t_b = context.current().duration_since(start).unwrap();
1640
1641 let (msg1, t1, msg2, t2) = if msg_a[0] == 1 {
1643 (msg_a, t_a, msg_b, t_b)
1644 } else {
1645 (msg_b, t_b, msg_a, t_a)
1646 };
1647
1648 assert_eq!(msg1[0], 1);
1649 assert_eq!(msg2[0], 2);
1650
1651 assert!(
1656 t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1657 "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1658 );
1659
1660 assert!(
1661 t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1662 "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1663 );
1664 });
1665 }
1666
1667 #[test]
1668 fn test_dynamic_bandwidth_varied_sizes() {
1669 let executor = deterministic::Runner::default();
1672 executor.start(|context| async move {
1673 let (network, mut oracle) = Network::new(
1674 context.with_label("network"),
1675 Config {
1676 max_size: 1024 * 1024,
1677 disconnect_on_block: true,
1678 tracked_peer_sets: None,
1679 },
1680 );
1681 network.start();
1682
1683 let mut senders = Vec::new();
1685 let mut sender_txs = Vec::new();
1686 for i in 0..3 {
1687 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1688 senders.push(sender.clone());
1689 let (tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1690 sender_txs.push(tx);
1691
1692 oracle
1694 .limit_bandwidth(sender.clone(), None, None)
1695 .await
1696 .unwrap();
1697 }
1698
1699 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1701 let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1702 oracle
1703 .limit_bandwidth(receiver.clone(), None, Some(30_000))
1704 .await
1705 .unwrap();
1706
1707 for sender in &senders {
1709 oracle
1710 .add_link(
1711 sender.clone(),
1712 receiver.clone(),
1713 Link {
1714 latency: Duration::from_millis(1),
1715 jitter: Duration::ZERO,
1716 success_rate: 1.0,
1717 },
1718 )
1719 .await
1720 .unwrap();
1721 }
1722
1723 let start = context.current();
1724
1725 let sizes = [10_000, 20_000, 30_000];
1731 for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
1732 let rx_clone = receiver.clone();
1733 let msg_size = *size;
1734 let msg = Bytes::from(vec![i as u8; msg_size]);
1735 tx.send(Recipients::One(rx_clone), msg, true).await.unwrap();
1736 }
1737
1738 let mut messages = Vec::new();
1742 for _ in 0..3 {
1743 let (_, msg) = receiver_rx.recv().await.unwrap();
1744 let t = context.current().duration_since(start).unwrap();
1745 messages.push((msg[0] as usize, msg.len(), t));
1746 }
1747
1748 assert_eq!(messages.len(), 3);
1753
1754 let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
1756 assert!(
1757 max_time >= Duration::from_millis(2000),
1758 "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
1759 );
1760 });
1761 }
1762
1763 #[test]
1764 fn test_bandwidth_pipe_reservation_duration() {
1765 let executor = deterministic::Runner::default();
1768 executor.start(|context| async move {
1769 let (network, mut oracle) = Network::new(
1770 context.with_label("network"),
1771 Config {
1772 max_size: 1024 * 1024,
1773 disconnect_on_block: true,
1774 tracked_peer_sets: None,
1775 },
1776 );
1777 network.start();
1778
1779 let sender = PrivateKey::from_seed(1).public_key();
1781 let receiver = PrivateKey::from_seed(2).public_key();
1782
1783 let (sender_tx, _) = oracle.control(sender.clone()).register(0).await.unwrap();
1784 let (_, mut receiver_rx) = oracle.control(receiver.clone()).register(0).await.unwrap();
1785
1786 oracle
1788 .limit_bandwidth(sender.clone(), Some(1000), None)
1789 .await
1790 .unwrap();
1791 oracle
1792 .limit_bandwidth(receiver.clone(), None, Some(1000))
1793 .await
1794 .unwrap();
1795
1796 oracle
1798 .add_link(
1799 sender.clone(),
1800 receiver.clone(),
1801 Link {
1802 latency: Duration::from_secs(1), jitter: Duration::ZERO,
1804 success_rate: 1.0,
1805 },
1806 )
1807 .await
1808 .unwrap();
1809
1810 let start = context.current();
1821
1822 for i in 0..3 {
1824 let mut sender_tx = sender_tx.clone();
1825 let receiver = receiver.clone();
1826 let msg = Bytes::from(vec![i; 500]);
1827 sender_tx
1828 .send(Recipients::One(receiver), msg, false)
1829 .await
1830 .unwrap();
1831 }
1832
1833 let mut receive_times = Vec::new();
1835 for i in 0..3 {
1836 let (_, received) = receiver_rx.recv().await.unwrap();
1837 receive_times.push(context.current().duration_since(start).unwrap());
1838 assert_eq!(received[0], i);
1839 }
1840
1841 for (i, time) in receive_times.iter().enumerate() {
1846 let expected_min = (i as u64 * 500) + 1500;
1847 let expected_max = expected_min + 100;
1848
1849 assert!(
1850 *time >= Duration::from_millis(expected_min)
1851 && *time < Duration::from_millis(expected_max),
1852 "Message {} should arrive at ~{}ms, got {:?}",
1853 i + 1,
1854 expected_min,
1855 time
1856 );
1857 }
1858 });
1859 }
1860
1861 #[test]
1862 fn test_dynamic_bandwidth_affects_new_transfers() {
1863 let executor = deterministic::Runner::default();
1866 executor.start(|context| async move {
1867 let (network, mut oracle) = Network::new(
1868 context.with_label("network"),
1869 Config {
1870 max_size: 1024 * 1024,
1871 disconnect_on_block: true,
1872 tracked_peer_sets: None,
1873 },
1874 );
1875 network.start();
1876
1877 let pk_sender = PrivateKey::from_seed(1).public_key();
1878 let pk_receiver = PrivateKey::from_seed(2).public_key();
1879
1880 let (mut sender_tx, _) = oracle.control(pk_sender.clone()).register(0).await.unwrap();
1882 let (_, mut receiver_rx) = oracle
1883 .control(pk_receiver.clone())
1884 .register(0)
1885 .await
1886 .unwrap();
1887 oracle
1888 .add_link(
1889 pk_sender.clone(),
1890 pk_receiver.clone(),
1891 Link {
1892 latency: Duration::from_millis(1), jitter: Duration::ZERO,
1894 success_rate: 1.0,
1895 },
1896 )
1897 .await
1898 .unwrap();
1899
1900 oracle
1902 .limit_bandwidth(pk_sender.clone(), Some(10_000), None)
1903 .await
1904 .unwrap();
1905 oracle
1906 .limit_bandwidth(pk_receiver.clone(), None, Some(10_000))
1907 .await
1908 .unwrap();
1909
1910 let msg1 = Bytes::from(vec![1u8; 20_000]); let start_time = context.current();
1913 sender_tx
1914 .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
1915 .await
1916 .unwrap();
1917
1918 let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
1920 let msg1_time = context.current().duration_since(start_time).unwrap();
1921 assert_eq!(received_msg1.len(), 20_000);
1922 assert!(
1923 msg1_time >= Duration::from_millis(1999)
1924 && msg1_time <= Duration::from_millis(2010),
1925 "First message should take ~2s, got {msg1_time:?}",
1926 );
1927
1928 oracle
1930 .limit_bandwidth(pk_sender.clone(), Some(2_000), None)
1931 .await
1932 .unwrap();
1933
1934 let msg2 = Bytes::from(vec![2u8; 10_000]); let msg2_start = context.current();
1937 sender_tx
1938 .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
1939 .await
1940 .unwrap();
1941
1942 let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
1944 let msg2_time = context.current().duration_since(msg2_start).unwrap();
1945 assert_eq!(received_msg2.len(), 10_000);
1946 assert!(
1947 msg2_time >= Duration::from_millis(4999)
1948 && msg2_time <= Duration::from_millis(5010),
1949 "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
1950 );
1951 });
1952 }
1953
1954 #[test]
1955 fn test_zero_receiver_ingress_bandwidth() {
1956 let executor = deterministic::Runner::default();
1957 executor.start(|context| async move {
1958 let (network, mut oracle) = Network::new(
1959 context.with_label("network"),
1960 Config {
1961 max_size: 1024 * 1024,
1962 disconnect_on_block: true,
1963 tracked_peer_sets: None,
1964 },
1965 );
1966 network.start();
1967
1968 let pk_sender = PrivateKey::from_seed(1).public_key();
1969 let pk_receiver = PrivateKey::from_seed(2).public_key();
1970
1971 let (mut sender_tx, _) = oracle.control(pk_sender.clone()).register(0).await.unwrap();
1973 let (_, mut receiver_rx) = oracle
1974 .control(pk_receiver.clone())
1975 .register(0)
1976 .await
1977 .unwrap();
1978 oracle
1979 .add_link(
1980 pk_sender.clone(),
1981 pk_receiver.clone(),
1982 Link {
1983 latency: Duration::ZERO,
1984 jitter: Duration::ZERO,
1985 success_rate: 1.0,
1986 },
1987 )
1988 .await
1989 .unwrap();
1990
1991 oracle
1993 .limit_bandwidth(pk_receiver.clone(), None, Some(0))
1994 .await
1995 .unwrap();
1996
1997 let msg1 = Bytes::from(vec![1u8; 20_000]); let sent = sender_tx
2000 .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2001 .await
2002 .unwrap();
2003 assert_eq!(sent.len(), 1);
2004 assert_eq!(sent[0], pk_receiver);
2005
2006 select! {
2008 _ = receiver_rx.recv() => {
2009 panic!("unexpected message");
2010 },
2011 _ = context.sleep(Duration::from_secs(10)) => {},
2012 }
2013
2014 oracle
2016 .limit_bandwidth(pk_receiver.clone(), None, None)
2017 .await
2018 .unwrap();
2019
2020 select! {
2022 _ = receiver_rx.recv() => {},
2023 _ = context.sleep(Duration::from_secs(1)) => {
2024 panic!("timeout");
2025 },
2026 }
2027 });
2028 }
2029
2030 #[test]
2031 fn test_zero_sender_egress_bandwidth() {
2032 let executor = deterministic::Runner::default();
2033 executor.start(|context| async move {
2034 let (network, mut oracle) = Network::new(
2035 context.with_label("network"),
2036 Config {
2037 max_size: 1024 * 1024,
2038 disconnect_on_block: true,
2039 tracked_peer_sets: None,
2040 },
2041 );
2042 network.start();
2043
2044 let pk_sender = PrivateKey::from_seed(1).public_key();
2045 let pk_receiver = PrivateKey::from_seed(2).public_key();
2046
2047 let (mut sender_tx, _) = oracle.control(pk_sender.clone()).register(0).await.unwrap();
2049 let (_, mut receiver_rx) = oracle
2050 .control(pk_receiver.clone())
2051 .register(0)
2052 .await
2053 .unwrap();
2054 oracle
2055 .add_link(
2056 pk_sender.clone(),
2057 pk_receiver.clone(),
2058 Link {
2059 latency: Duration::ZERO,
2060 jitter: Duration::ZERO,
2061 success_rate: 1.0,
2062 },
2063 )
2064 .await
2065 .unwrap();
2066
2067 oracle
2069 .limit_bandwidth(pk_sender.clone(), Some(0), None)
2070 .await
2071 .unwrap();
2072
2073 let msg1 = Bytes::from(vec![1u8; 20_000]); let sent = sender_tx
2076 .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2077 .await
2078 .unwrap();
2079 assert_eq!(sent.len(), 1);
2080 assert_eq!(sent[0], pk_receiver);
2081
2082 select! {
2084 _ = receiver_rx.recv() => {
2085 panic!("unexpected message");
2086 },
2087 _ = context.sleep(Duration::from_secs(10)) => {},
2088 }
2089
2090 oracle
2092 .limit_bandwidth(pk_sender.clone(), None, None)
2093 .await
2094 .unwrap();
2095
2096 select! {
2098 _ = receiver_rx.recv() => {},
2099 _ = context.sleep(Duration::from_secs(1)) => {
2100 panic!("timeout");
2101 },
2102 }
2103 });
2104 }
2105
2106 #[test]
2107 fn register_peer_set() {
2108 let executor = deterministic::Runner::default();
2109 executor.start(|context| async move {
2110 let (network, mut oracle) = Network::new(
2111 context.with_label("network"),
2112 Config {
2113 max_size: 1024 * 1024,
2114 disconnect_on_block: true,
2115 tracked_peer_sets: Some(3),
2116 },
2117 );
2118 network.start();
2119
2120 assert_eq!(oracle.peer_set(0).await, Some([].into()));
2121
2122 let pk1 = PrivateKey::from_seed(1).public_key();
2123 let pk2 = PrivateKey::from_seed(2).public_key();
2124 oracle.update(0xFF, [pk1.clone(), pk2.clone()].into()).await;
2125
2126 assert_eq!(oracle.peer_set(0xFF).await.unwrap(), [pk1, pk2].into());
2127 });
2128 }
2129
2130 #[test]
2131 fn test_peer_set_window_management() {
2132 let executor = deterministic::Runner::default();
2133 executor.start(|context| async move {
2134 let (network, mut oracle) = Network::new(
2135 context.with_label("network"),
2136 Config {
2137 max_size: 1024 * 1024,
2138 disconnect_on_block: true,
2139 tracked_peer_sets: Some(2), },
2141 );
2142 network.start();
2143
2144 let pk1 = PrivateKey::from_seed(1).public_key();
2146 let pk2 = PrivateKey::from_seed(2).public_key();
2147 let pk3 = PrivateKey::from_seed(3).public_key();
2148 let pk4 = PrivateKey::from_seed(4).public_key();
2149
2150 oracle
2152 .update(1, vec![pk1.clone(), pk2.clone()].into())
2153 .await;
2154
2155 let (mut sender1, _receiver1) = oracle.control(pk1.clone()).register(0).await.unwrap();
2157 let (mut sender2, _receiver2) = oracle.control(pk2.clone()).register(0).await.unwrap();
2158 let (mut sender3, _receiver3) = oracle.control(pk3.clone()).register(0).await.unwrap();
2159 let (_mut_sender4, _receiver4) = oracle.control(pk4.clone()).register(0).await.unwrap();
2160
2161 for peer_a in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2163 for peer_b in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2164 if peer_a != peer_b {
2165 oracle
2166 .add_link(
2167 peer_a.clone(),
2168 peer_b.clone(),
2169 Link {
2170 latency: Duration::from_millis(1),
2171 jitter: Duration::ZERO,
2172 success_rate: 1.0,
2173 },
2174 )
2175 .await
2176 .unwrap();
2177 }
2178 }
2179 }
2180
2181 let sent = sender1
2183 .send(Recipients::One(pk2.clone()), Bytes::from("msg1"), false)
2184 .await
2185 .unwrap();
2186 assert_eq!(sent.len(), 1);
2187
2188 let sent = sender1
2190 .send(Recipients::One(pk3.clone()), Bytes::from("msg2"), false)
2191 .await
2192 .unwrap();
2193 assert_eq!(sent.len(), 0);
2194
2195 oracle
2197 .update(2, vec![pk2.clone(), pk3.clone()].into())
2198 .await;
2199
2200 let sent = sender1
2202 .send(Recipients::One(pk3.clone()), Bytes::from("msg3"), false)
2203 .await
2204 .unwrap();
2205 assert_eq!(sent.len(), 1);
2206
2207 oracle
2209 .update(3, vec![pk3.clone(), pk4.clone()].into())
2210 .await;
2211
2212 let sent = sender2
2215 .send(Recipients::One(pk1.clone()), Bytes::from("msg4"), false)
2216 .await
2217 .unwrap();
2218 assert_eq!(sent.len(), 0);
2219
2220 let sent = sender2
2222 .send(Recipients::One(pk3.clone()), Bytes::from("msg5"), false)
2223 .await
2224 .unwrap();
2225 assert_eq!(sent.len(), 1);
2226
2227 let sent = sender3
2229 .send(Recipients::One(pk4.clone()), Bytes::from("msg6"), false)
2230 .await
2231 .unwrap();
2232 assert_eq!(sent.len(), 1);
2233
2234 let peer_set_2 = oracle.peer_set(2).await.unwrap();
2236 assert!(peer_set_2.as_ref().contains(&pk2));
2237 assert!(peer_set_2.as_ref().contains(&pk3));
2238
2239 let peer_set_3 = oracle.peer_set(3).await.unwrap();
2240 assert!(peer_set_3.as_ref().contains(&pk3));
2241 assert!(peer_set_3.as_ref().contains(&pk4));
2242
2243 assert!(oracle.peer_set(1).await.is_none());
2245 });
2246 }
2247
2248 #[test]
2249 fn test_subscribe_to_peer_sets() {
2250 let executor = deterministic::Runner::default();
2251 executor.start(|context| async move {
2252 let (network, mut oracle) = Network::new(
2253 context.with_label("network"),
2254 Config {
2255 max_size: 1024 * 1024,
2256 disconnect_on_block: true,
2257 tracked_peer_sets: Some(2),
2258 },
2259 );
2260 network.start();
2261
2262 let mut subscription = oracle.subscribe().await;
2264
2265 let pk1 = PrivateKey::from_seed(1).public_key();
2267 let pk2 = PrivateKey::from_seed(2).public_key();
2268 let pk3 = PrivateKey::from_seed(3).public_key();
2269
2270 oracle
2272 .update(1, vec![pk1.clone(), pk2.clone()].into())
2273 .await;
2274
2275 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2277 assert_eq!(peer_set_id, 1);
2278 assert_eq!(peer_set, vec![pk1.clone(), pk2.clone()].into());
2279 assert_eq!(all, vec![pk1.clone(), pk2.clone()].into());
2280
2281 oracle
2283 .update(2, vec![pk2.clone(), pk3.clone()].into())
2284 .await;
2285
2286 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2288 assert_eq!(peer_set_id, 2);
2289 assert_eq!(peer_set, vec![pk2.clone(), pk3.clone()].into());
2290 assert_eq!(all, vec![pk1.clone(), pk2.clone(), pk3.clone()].into());
2291
2292 oracle
2294 .update(3, vec![pk1.clone(), pk3.clone()].into())
2295 .await;
2296
2297 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2299 assert_eq!(peer_set_id, 3);
2300 assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].into());
2301 assert_eq!(all, vec![pk1.clone(), pk2.clone(), pk3.clone()].into());
2302
2303 oracle
2305 .update(4, vec![pk1.clone(), pk3.clone()].into())
2306 .await;
2307
2308 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2310 assert_eq!(peer_set_id, 4);
2311 assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].into());
2312 assert_eq!(all, vec![pk1.clone(), pk3.clone()].into());
2313 });
2314 }
2315
2316 #[test]
2317 fn test_multiple_subscriptions() {
2318 let executor = deterministic::Runner::default();
2319 executor.start(|context| async move {
2320 let (network, mut oracle) = Network::new(
2321 context.with_label("network"),
2322 Config {
2323 max_size: 1024 * 1024,
2324 disconnect_on_block: true,
2325 tracked_peer_sets: Some(3),
2326 },
2327 );
2328 network.start();
2329
2330 let mut subscription1 = oracle.subscribe().await;
2332 let mut subscription2 = oracle.subscribe().await;
2333 let mut subscription3 = oracle.subscribe().await;
2334
2335 let pk1 = PrivateKey::from_seed(1).public_key();
2337 let pk2 = PrivateKey::from_seed(2).public_key();
2338
2339 oracle
2341 .update(1, vec![pk1.clone(), pk2.clone()].into())
2342 .await;
2343
2344 let (id1, _, _) = subscription1.next().await.unwrap();
2346 let (id2, _, _) = subscription2.next().await.unwrap();
2347 let (id3, _, _) = subscription3.next().await.unwrap();
2348
2349 assert_eq!(id1, 1);
2350 assert_eq!(id2, 1);
2351 assert_eq!(id3, 1);
2352
2353 drop(subscription2);
2355
2356 oracle
2358 .update(2, vec![pk1.clone(), pk2.clone()].into())
2359 .await;
2360
2361 let (id1, _, _) = subscription1.next().await.unwrap();
2363 let (id3, _, _) = subscription3.next().await.unwrap();
2364
2365 assert_eq!(id1, 2);
2366 assert_eq!(id3, 2);
2367 });
2368 }
2369}