1mod bandwidth;
144mod ingress;
145mod metrics;
146mod network;
147mod transmitter;
148
149use thiserror::Error;
150
151#[derive(Debug, Error)]
153pub enum Error {
154 #[error("message too large: {0}")]
155 MessageTooLarge(usize),
156 #[error("network closed")]
157 NetworkClosed,
158 #[error("not valid to link self")]
159 LinkingSelf,
160 #[error("link already exists")]
161 LinkExists,
162 #[error("link missing")]
163 LinkMissing,
164 #[error("invalid success rate (must be in [0, 1]): {0}")]
165 InvalidSuccessRate(f64),
166 #[error("send_frame failed")]
167 SendFrameFailed,
168 #[error("recv_frame failed")]
169 RecvFrameFailed,
170 #[error("bind failed")]
171 BindFailed,
172 #[error("accept failed")]
173 AcceptFailed,
174 #[error("dial failed")]
175 DialFailed,
176 #[error("peer missing")]
177 PeerMissing,
178}
179
180pub use ingress::{Control, Link, Manager, Oracle, SocketManager};
181pub use network::{
182 Config, ConnectedPeerProvider, Network, Receiver, Sender, SplitForwarder, SplitOrigin,
183 SplitRouter, SplitSender, SplitTarget, UnlimitedSender,
184};
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use crate::{Address, Ingress, Manager, Receiver, Recipients, Sender};
190 use bytes::Bytes;
191 use commonware_cryptography::{
192 ed25519::{self, PrivateKey, PublicKey},
193 Signer as _,
194 };
195 use commonware_macros::select;
196 use commonware_runtime::{deterministic, Clock, Metrics, Quota, Runner, Spawner};
197 use commonware_utils::{hostname, ordered::Map, NZU32};
198 use futures::{channel::mpsc, SinkExt, StreamExt};
199 use rand::Rng;
200 use std::{
201 collections::{BTreeMap, HashMap, HashSet},
202 net::SocketAddr,
203 num::NonZeroU32,
204 time::Duration,
205 };
206
207 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
209
210 fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
211 let executor = deterministic::Runner::seeded(seed);
212 executor.start(|context| async move {
213 let (network, mut oracle) = Network::new(
215 context.with_label("network"),
216 Config {
217 max_size: 1024 * 1024,
218 disconnect_on_block: true,
219 tracked_peer_sets: None,
220 },
221 );
222
223 network.start();
225
226 let mut agents = BTreeMap::new();
228 let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
229 for i in 0..size {
230 let pk = PrivateKey::from_seed(i as u64).public_key();
231 let (sender, mut receiver) = oracle
232 .control(pk.clone())
233 .register(0, TEST_QUOTA)
234 .await
235 .unwrap();
236 agents.insert(pk, sender);
237 let mut agent_sender = seen_sender.clone();
238 context
239 .with_label("agent_receiver")
240 .spawn(move |_| async move {
241 for _ in 0..size {
242 receiver.recv().await.unwrap();
243 }
244 agent_sender.send(i).await.unwrap();
245
246 });
248 }
249
250 let only_inbound = PrivateKey::from_seed(0).public_key();
252 for agent in agents.keys() {
253 if agent == &only_inbound {
254 continue;
256 }
257 for other in agents.keys() {
258 let result = oracle
259 .add_link(
260 agent.clone(),
261 other.clone(),
262 Link {
263 latency: Duration::from_millis(5),
264 jitter: Duration::from_millis(2),
265 success_rate: 0.75,
266 },
267 )
268 .await;
269 if agent == other {
270 assert!(matches!(result, Err(Error::LinkingSelf)));
271 } else {
272 assert!(result.is_ok());
273 }
274 }
275 }
276
277 context
279 .with_label("agent_sender")
280 .spawn(|mut context| async move {
281 let keys = agents.keys().collect::<Vec<_>>();
283
284 loop {
286 let index = context.gen_range(0..keys.len());
287 let sender = keys[index];
288 let msg = format!("hello from {sender:?}");
289 let msg = Bytes::from(msg);
290 let mut message_sender = agents.get(sender).unwrap().clone();
291 let sent = message_sender
292 .send(Recipients::All, msg.clone(), false)
293 .await
294 .unwrap();
295 if sender == &only_inbound {
296 assert_eq!(sent.len(), 0);
297 } else {
298 assert_eq!(sent.len(), keys.len() - 1);
299 }
300 }
301 });
302
303 let mut results = Vec::new();
305 for _ in 0..size {
306 results.push(seen_receiver.next().await.unwrap());
307 }
308 (context.auditor().state(), results)
309 })
310 }
311
312 fn compare_outputs(seeds: u64, size: usize) {
313 let mut outputs = Vec::new();
315 for seed in 0..seeds {
316 outputs.push(simulate_messages(seed, size));
317 }
318
319 for seed in 0..seeds {
321 let output = simulate_messages(seed, size);
322 assert_eq!(output, outputs[seed as usize]);
323 }
324 }
325
326 #[test]
327 fn test_determinism() {
328 compare_outputs(25, 25);
329 }
330
331 #[test]
332 fn test_message_too_big() {
333 let executor = deterministic::Runner::default();
334 executor.start(|mut context| async move {
335 let (network, oracle) = Network::new(
337 context.with_label("network"),
338 Config {
339 max_size: 1024 * 1024,
340 disconnect_on_block: true,
341 tracked_peer_sets: None,
342 },
343 );
344
345 network.start();
347
348 let mut agents = HashMap::new();
350 for i in 0..10 {
351 let pk = PrivateKey::from_seed(i as u64).public_key();
352 let (sender, _) = oracle
353 .control(pk.clone())
354 .register(0, TEST_QUOTA)
355 .await
356 .unwrap();
357 agents.insert(pk, sender);
358 }
359
360 let keys = agents.keys().collect::<Vec<_>>();
362 let index = context.gen_range(0..keys.len());
363 let sender = keys[index];
364 let mut message_sender = agents.get(sender).unwrap().clone();
365 let mut msg = vec![0u8; 1024 * 1024 + 1];
366 context.fill(&mut msg[..]);
367 let result = message_sender
368 .send(Recipients::All, msg.into(), false)
369 .await
370 .unwrap_err();
371
372 assert!(matches!(result, Error::MessageTooLarge(_)));
374 });
375 }
376
377 #[test]
378 fn test_linking_self() {
379 let executor = deterministic::Runner::default();
380 executor.start(|context| async move {
381 let (network, mut oracle) = Network::new(
383 context.with_label("network"),
384 Config {
385 max_size: 1024 * 1024,
386 disconnect_on_block: true,
387 tracked_peer_sets: None,
388 },
389 );
390
391 network.start();
393
394 let pk = PrivateKey::from_seed(0).public_key();
396 oracle
397 .control(pk.clone())
398 .register(0, TEST_QUOTA)
399 .await
400 .unwrap();
401
402 let result = oracle
404 .add_link(
405 pk.clone(),
406 pk,
407 Link {
408 latency: Duration::from_millis(5),
409 jitter: Duration::from_millis(2),
410 success_rate: 0.75,
411 },
412 )
413 .await;
414
415 assert!(matches!(result, Err(Error::LinkingSelf)));
417 });
418 }
419
420 #[test]
421 fn test_duplicate_channel() {
422 let executor = deterministic::Runner::default();
423 executor.start(|context| async move {
424 let (network, mut oracle) = Network::new(
426 context.with_label("network"),
427 Config {
428 max_size: 1024 * 1024,
429 disconnect_on_block: true,
430 tracked_peer_sets: None,
431 },
432 );
433
434 network.start();
436
437 let my_pk = PrivateKey::from_seed(0).public_key();
439 let other_pk = PrivateKey::from_seed(1).public_key();
440 oracle
441 .add_link(
442 my_pk.clone(),
443 other_pk.clone(),
444 Link {
445 latency: Duration::from_millis(10),
446 jitter: Duration::from_millis(1),
447 success_rate: 1.0,
448 },
449 )
450 .await
451 .unwrap();
452 oracle
453 .add_link(
454 other_pk.clone(),
455 my_pk.clone(),
456 Link {
457 latency: Duration::from_millis(10),
458 jitter: Duration::from_millis(1),
459 success_rate: 1.0,
460 },
461 )
462 .await
463 .unwrap();
464
465 let (mut my_sender, mut my_receiver) = oracle
467 .control(my_pk.clone())
468 .register(0, TEST_QUOTA)
469 .await
470 .unwrap();
471 let (mut other_sender, mut other_receiver) = oracle
472 .control(other_pk.clone())
473 .register(0, TEST_QUOTA)
474 .await
475 .unwrap();
476
477 let msg = Bytes::from("hello");
479 my_sender
480 .send(Recipients::One(other_pk.clone()), msg.clone(), false)
481 .await
482 .unwrap();
483 let (from, message) = other_receiver.recv().await.unwrap();
484 assert_eq!(from, my_pk);
485 assert_eq!(message, msg);
486 other_sender
487 .send(Recipients::One(my_pk.clone()), msg.clone(), false)
488 .await
489 .unwrap();
490 let (from, message) = my_receiver.recv().await.unwrap();
491 assert_eq!(from, other_pk);
492 assert_eq!(message, msg);
493
494 let (mut my_sender_2, mut my_receiver_2) = oracle
496 .control(my_pk.clone())
497 .register(0, TEST_QUOTA)
498 .await
499 .unwrap();
500
501 let msg = Bytes::from("hello again");
503 my_sender_2
504 .send(Recipients::One(other_pk.clone()), msg.clone(), false)
505 .await
506 .unwrap();
507 let (from, message) = other_receiver.recv().await.unwrap();
508 assert_eq!(from, my_pk);
509 assert_eq!(message, msg);
510 other_sender
511 .send(Recipients::One(my_pk.clone()), msg.clone(), false)
512 .await
513 .unwrap();
514 let (from, message) = my_receiver_2.recv().await.unwrap();
515 assert_eq!(from, other_pk);
516 assert_eq!(message, msg);
517
518 assert!(matches!(
520 my_receiver.recv().await,
521 Err(Error::NetworkClosed)
522 ));
523
524 assert!(matches!(
526 my_sender
527 .send(Recipients::One(other_pk.clone()), msg.clone(), false)
528 .await,
529 Err(Error::NetworkClosed)
530 ));
531 });
532 }
533
534 #[test]
535 fn test_invalid_success_rate() {
536 let executor = deterministic::Runner::default();
537 executor.start(|context| async move {
538 let (network, mut oracle) = Network::new(
540 context.with_label("network"),
541 Config {
542 max_size: 1024 * 1024,
543 disconnect_on_block: true,
544 tracked_peer_sets: None,
545 },
546 );
547
548 network.start();
550
551 let pk1 = PrivateKey::from_seed(0).public_key();
553 let pk2 = PrivateKey::from_seed(1).public_key();
554 oracle
555 .control(pk1.clone())
556 .register(0, TEST_QUOTA)
557 .await
558 .unwrap();
559 oracle
560 .control(pk2.clone())
561 .register(0, TEST_QUOTA)
562 .await
563 .unwrap();
564
565 let result = oracle
567 .add_link(
568 pk1,
569 pk2,
570 Link {
571 latency: Duration::from_millis(5),
572 jitter: Duration::from_millis(2),
573 success_rate: 1.5,
574 },
575 )
576 .await;
577
578 assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
580 });
581 }
582
583 #[test]
584 fn test_add_link_before_channel_registration() {
585 let executor = deterministic::Runner::default();
586 executor.start(|context| async move {
587 let (network, mut oracle) = Network::new(
589 context.with_label("network"),
590 Config {
591 max_size: 1024 * 1024,
592 disconnect_on_block: true,
593 tracked_peer_sets: Some(3),
594 },
595 );
596 network.start();
597
598 let pk1 = PrivateKey::from_seed(0).public_key();
600 let pk2 = PrivateKey::from_seed(1).public_key();
601
602 let mut manager = oracle.manager();
604 manager
605 .update(0, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
606 .await;
607
608 oracle
610 .add_link(
611 pk1.clone(),
612 pk2.clone(),
613 Link {
614 latency: Duration::ZERO,
615 jitter: Duration::ZERO,
616 success_rate: 1.0,
617 },
618 )
619 .await
620 .unwrap();
621
622 let (mut sender1, _receiver1) = oracle
624 .control(pk1.clone())
625 .register(0, TEST_QUOTA)
626 .await
627 .unwrap();
628 let (_, mut receiver2) = oracle
629 .control(pk2.clone())
630 .register(0, TEST_QUOTA)
631 .await
632 .unwrap();
633
634 let msg1 = Bytes::from("link-before-register-1");
636 sender1
637 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
638 .await
639 .unwrap();
640 let (from, received) = receiver2.recv().await.unwrap();
641 assert_eq!(from, pk1);
642 assert_eq!(received, msg1);
643 });
644 }
645
646 #[test]
647 fn test_simple_message_delivery() {
648 let executor = deterministic::Runner::default();
649 executor.start(|context| async move {
650 let (network, mut oracle) = Network::new(
652 context.with_label("network"),
653 Config {
654 max_size: 1024 * 1024,
655 disconnect_on_block: true,
656 tracked_peer_sets: None,
657 },
658 );
659
660 network.start();
662
663 let pk1 = PrivateKey::from_seed(0).public_key();
665 let pk2 = PrivateKey::from_seed(1).public_key();
666 let (mut sender1, mut receiver1) = oracle
667 .control(pk1.clone())
668 .register(0, TEST_QUOTA)
669 .await
670 .unwrap();
671 let (mut sender2, mut receiver2) = oracle
672 .control(pk2.clone())
673 .register(0, TEST_QUOTA)
674 .await
675 .unwrap();
676
677 let _ = oracle
679 .control(pk1.clone())
680 .register(1, TEST_QUOTA)
681 .await
682 .unwrap();
683 let _ = oracle
684 .control(pk2.clone())
685 .register(2, TEST_QUOTA)
686 .await
687 .unwrap();
688
689 oracle
691 .add_link(
692 pk1.clone(),
693 pk2.clone(),
694 Link {
695 latency: Duration::from_millis(5),
696 jitter: Duration::from_millis(2),
697 success_rate: 1.0,
698 },
699 )
700 .await
701 .unwrap();
702 oracle
703 .add_link(
704 pk2.clone(),
705 pk1.clone(),
706 Link {
707 latency: Duration::from_millis(5),
708 jitter: Duration::from_millis(2),
709 success_rate: 1.0,
710 },
711 )
712 .await
713 .unwrap();
714
715 let msg1 = Bytes::from("hello from pk1");
717 let msg2 = Bytes::from("hello from pk2");
718 sender1
719 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
720 .await
721 .unwrap();
722 sender2
723 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
724 .await
725 .unwrap();
726
727 let (sender, message) = receiver1.recv().await.unwrap();
729 assert_eq!(sender, pk2);
730 assert_eq!(message, msg2);
731 let (sender, message) = receiver2.recv().await.unwrap();
732 assert_eq!(sender, pk1);
733 assert_eq!(message, msg1);
734 });
735 }
736
737 #[test]
738 fn test_send_wrong_channel() {
739 let executor = deterministic::Runner::default();
740 executor.start(|context| async move {
741 let (network, mut oracle) = Network::new(
743 context.with_label("network"),
744 Config {
745 max_size: 1024 * 1024,
746 disconnect_on_block: true,
747 tracked_peer_sets: None,
748 },
749 );
750
751 network.start();
753
754 let pk1 = PrivateKey::from_seed(0).public_key();
756 let pk2 = PrivateKey::from_seed(1).public_key();
757 let (mut sender1, _) = oracle
758 .control(pk1.clone())
759 .register(0, TEST_QUOTA)
760 .await
761 .unwrap();
762 let (_, mut receiver2) = oracle
763 .control(pk2.clone())
764 .register(1, TEST_QUOTA)
765 .await
766 .unwrap();
767
768 oracle
770 .add_link(
771 pk1,
772 pk2.clone(),
773 Link {
774 latency: Duration::from_millis(5),
775 jitter: Duration::ZERO,
776 success_rate: 1.0,
777 },
778 )
779 .await
780 .unwrap();
781
782 let msg = Bytes::from("hello from pk1");
784 sender1
785 .send(Recipients::One(pk2), msg, false)
786 .await
787 .unwrap();
788
789 select! {
791 _ = receiver2.recv() => {
792 panic!("unexpected message");
793 },
794 _ = context.sleep(Duration::from_secs(1)) => {},
795 }
796 });
797 }
798
799 #[test]
800 fn test_dynamic_peers() {
801 let executor = deterministic::Runner::default();
802 executor.start(|context| async move {
803 let (network, mut oracle) = Network::new(
805 context.with_label("network"),
806 Config {
807 max_size: 1024 * 1024,
808 disconnect_on_block: true,
809 tracked_peer_sets: None,
810 },
811 );
812
813 network.start();
815
816 let pk1 = PrivateKey::from_seed(0).public_key();
818 let pk2 = PrivateKey::from_seed(1).public_key();
819 let (mut sender1, mut receiver1) = oracle
820 .control(pk1.clone())
821 .register(0, TEST_QUOTA)
822 .await
823 .unwrap();
824 let (mut sender2, mut receiver2) = oracle
825 .control(pk2.clone())
826 .register(0, TEST_QUOTA)
827 .await
828 .unwrap();
829
830 oracle
832 .add_link(
833 pk1.clone(),
834 pk2.clone(),
835 Link {
836 latency: Duration::from_millis(5),
837 jitter: Duration::from_millis(2),
838 success_rate: 1.0,
839 },
840 )
841 .await
842 .unwrap();
843 oracle
844 .add_link(
845 pk2.clone(),
846 pk1.clone(),
847 Link {
848 latency: Duration::from_millis(5),
849 jitter: Duration::from_millis(2),
850 success_rate: 1.0,
851 },
852 )
853 .await
854 .unwrap();
855
856 let msg1 = Bytes::from("attempt 1: hello from pk1");
858 let msg2 = Bytes::from("attempt 1: hello from pk2");
859 sender1
860 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
861 .await
862 .unwrap();
863 sender2
864 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
865 .await
866 .unwrap();
867
868 let (sender, message) = receiver1.recv().await.unwrap();
870 assert_eq!(sender, pk2);
871 assert_eq!(message, msg2);
872 let (sender, message) = receiver2.recv().await.unwrap();
873 assert_eq!(sender, pk1);
874 assert_eq!(message, msg1);
875 });
876 }
877
878 #[test]
879 fn test_dynamic_links() {
880 let executor = deterministic::Runner::default();
881 executor.start(|context| async move {
882 let (network, mut oracle) = Network::new(
884 context.with_label("network"),
885 Config {
886 max_size: 1024 * 1024,
887 disconnect_on_block: true,
888 tracked_peer_sets: None,
889 },
890 );
891
892 network.start();
894
895 let pk1 = PrivateKey::from_seed(0).public_key();
897 let pk2 = PrivateKey::from_seed(1).public_key();
898 let (mut sender1, mut receiver1) = oracle
899 .control(pk1.clone())
900 .register(0, TEST_QUOTA)
901 .await
902 .unwrap();
903 let (mut sender2, mut receiver2) = oracle
904 .control(pk2.clone())
905 .register(0, TEST_QUOTA)
906 .await
907 .unwrap();
908
909 let msg1 = Bytes::from("attempt 1: hello from pk1");
911 let msg2 = Bytes::from("attempt 1: hello from pk2");
912 sender1
913 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
914 .await
915 .unwrap();
916 sender2
917 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
918 .await
919 .unwrap();
920
921 select! {
923 _ = receiver1.recv() => {
924 panic!("unexpected message");
925 },
926 _ = receiver2.recv() => {
927 panic!("unexpected message");
928 },
929 _ = context.sleep(Duration::from_secs(1)) => {},
930 }
931
932 oracle
934 .add_link(
935 pk1.clone(),
936 pk2.clone(),
937 Link {
938 latency: Duration::from_millis(5),
939 jitter: Duration::from_millis(2),
940 success_rate: 1.0,
941 },
942 )
943 .await
944 .unwrap();
945 oracle
946 .add_link(
947 pk2.clone(),
948 pk1.clone(),
949 Link {
950 latency: Duration::from_millis(5),
951 jitter: Duration::from_millis(2),
952 success_rate: 1.0,
953 },
954 )
955 .await
956 .unwrap();
957
958 let msg1 = Bytes::from("attempt 2: hello from pk1");
960 let msg2 = Bytes::from("attempt 2: hello from pk2");
961 sender1
962 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
963 .await
964 .unwrap();
965 sender2
966 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
967 .await
968 .unwrap();
969
970 let (sender, message) = receiver1.recv().await.unwrap();
972 assert_eq!(sender, pk2);
973 assert_eq!(message, msg2);
974 let (sender, message) = receiver2.recv().await.unwrap();
975 assert_eq!(sender, pk1);
976 assert_eq!(message, msg1);
977
978 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
980 oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
981
982 let msg1 = Bytes::from("attempt 3: hello from pk1");
984 let msg2 = Bytes::from("attempt 3: hello from pk2");
985 sender1
986 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
987 .await
988 .unwrap();
989 sender2
990 .send(Recipients::One(pk1.clone()), msg2.clone(), false)
991 .await
992 .unwrap();
993
994 select! {
996 _ = receiver1.recv() => {
997 panic!("unexpected message");
998 },
999 _ = receiver2.recv() => {
1000 panic!("unexpected message");
1001 },
1002 _ = context.sleep(Duration::from_secs(1)) => {},
1003 }
1004
1005 let result = oracle.remove_link(pk1, pk2).await;
1007 assert!(matches!(result, Err(Error::LinkMissing)));
1008 });
1009 }
1010
1011 async fn test_bandwidth_between_peers(
1012 context: &mut deterministic::Context,
1013 oracle: &mut Oracle<PublicKey, deterministic::Context>,
1014 sender_bps: Option<usize>,
1015 receiver_bps: Option<usize>,
1016 message_size: usize,
1017 expected_duration_ms: u64,
1018 ) {
1019 let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
1021 let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
1022 let (mut sender, _) = oracle
1023 .control(pk1.clone())
1024 .register(0, TEST_QUOTA)
1025 .await
1026 .unwrap();
1027 let (_, mut receiver) = oracle
1028 .control(pk2.clone())
1029 .register(0, TEST_QUOTA)
1030 .await
1031 .unwrap();
1032
1033 oracle
1035 .limit_bandwidth(pk1.clone(), sender_bps, None)
1036 .await
1037 .unwrap();
1038 oracle
1039 .limit_bandwidth(pk2.clone(), None, receiver_bps)
1040 .await
1041 .unwrap();
1042
1043 oracle
1045 .add_link(
1046 pk1.clone(),
1047 pk2.clone(),
1048 Link {
1049 latency: Duration::ZERO,
1051 jitter: Duration::ZERO,
1052 success_rate: 1.0,
1053 },
1054 )
1055 .await
1056 .unwrap();
1057
1058 let msg = Bytes::from(vec![42u8; message_size]);
1060 let start = context.current();
1061 sender
1062 .send(Recipients::One(pk2.clone()), msg.clone(), true)
1063 .await
1064 .unwrap();
1065
1066 let (origin, received) = receiver.recv().await.unwrap();
1068 let elapsed = context.current().duration_since(start).unwrap();
1069
1070 assert_eq!(origin, pk1);
1071 assert_eq!(received, msg);
1072 assert!(
1073 elapsed >= Duration::from_millis(expected_duration_ms),
1074 "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
1075 );
1076 assert!(
1077 elapsed < Duration::from_millis(expected_duration_ms + 100),
1078 "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
1079 );
1080 }
1081
1082 #[test]
1083 fn test_bandwidth() {
1084 let executor = deterministic::Runner::default();
1085 executor.start(|mut context| async move {
1086 let (network, mut oracle) = Network::new(
1087 context.with_label("network"),
1088 Config {
1089 max_size: 1024 * 1024,
1090 disconnect_on_block: true,
1091 tracked_peer_sets: None,
1092 },
1093 );
1094 network.start();
1095
1096 test_bandwidth_between_peers(
1099 &mut context,
1100 &mut oracle,
1101 Some(1000), Some(1000), 500, 500, )
1106 .await;
1107
1108 test_bandwidth_between_peers(
1112 &mut context,
1113 &mut oracle,
1114 Some(500), Some(2000), 250, 500, )
1119 .await;
1120
1121 test_bandwidth_between_peers(
1125 &mut context,
1126 &mut oracle,
1127 Some(2000), Some(500), 250, 500, )
1132 .await;
1133
1134 test_bandwidth_between_peers(
1138 &mut context,
1139 &mut oracle,
1140 None, Some(1000), 500, 500, )
1145 .await;
1146
1147 test_bandwidth_between_peers(
1151 &mut context,
1152 &mut oracle,
1153 Some(1000), None, 500, 500, )
1158 .await;
1159
1160 test_bandwidth_between_peers(
1163 &mut context,
1164 &mut oracle,
1165 None, None, 500, 0, )
1170 .await;
1171 });
1172 }
1173
1174 #[test]
1175 fn test_bandwidth_contention() {
1176 let executor = deterministic::Runner::default();
1178 executor.start(|context| async move {
1179 let (network, mut oracle) = Network::new(
1180 context.with_label("network"),
1181 Config {
1182 max_size: 1024 * 1024,
1183 disconnect_on_block: true,
1184 tracked_peer_sets: None,
1185 },
1186 );
1187 network.start();
1188
1189 const NUM_PEERS: usize = 100;
1191 const MESSAGE_SIZE: usize = 1000; const EFFECTIVE_BPS: usize = 10_000; let mut peers = Vec::with_capacity(NUM_PEERS + 1);
1196 let mut senders = Vec::with_capacity(NUM_PEERS + 1);
1197 let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
1198
1199 for i in 0..=NUM_PEERS {
1201 let pk = PrivateKey::from_seed(i as u64).public_key();
1202 let (sender, receiver) = oracle
1203 .control(pk.clone())
1204 .register(0, TEST_QUOTA)
1205 .await
1206 .unwrap();
1207 peers.push(pk);
1208 senders.push(sender);
1209 receivers.push(receiver);
1210 }
1211
1212 for pk in &peers {
1214 oracle
1215 .limit_bandwidth(pk.clone(), Some(EFFECTIVE_BPS), Some(EFFECTIVE_BPS))
1216 .await
1217 .unwrap();
1218 }
1219
1220 for peer in peers.iter().skip(1) {
1222 oracle
1223 .add_link(
1224 peer.clone(),
1225 peers[0].clone(),
1226 Link {
1227 latency: Duration::ZERO,
1228 jitter: Duration::ZERO,
1229 success_rate: 1.0,
1230 },
1231 )
1232 .await
1233 .unwrap();
1234 oracle
1235 .add_link(
1236 peers[0].clone(),
1237 peer.clone(),
1238 Link {
1239 latency: Duration::ZERO,
1240 jitter: Duration::ZERO,
1241 success_rate: 1.0,
1242 },
1243 )
1244 .await
1245 .unwrap();
1246 }
1247
1248 let start = context.current();
1251
1252 let msg = Bytes::from(vec![0u8; MESSAGE_SIZE]);
1255 for peer in peers.iter().skip(1) {
1256 senders[0]
1257 .send(Recipients::One(peer.clone()), msg.clone(), true)
1258 .await
1259 .unwrap();
1260 }
1261
1262 for receiver in receivers.iter_mut().skip(1) {
1264 let (origin, received) = receiver.recv().await.unwrap();
1265 assert_eq!(origin, peers[0]);
1266 assert_eq!(received.len(), MESSAGE_SIZE);
1267 }
1268
1269 let elapsed = context.current().duration_since(start).unwrap();
1270
1271 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1273
1274 assert!(
1275 elapsed >= Duration::from_millis(expected_ms as u64),
1276 "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1277 );
1278 assert!(
1279 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1280 "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1281 );
1282
1283 let start = context.current();
1285
1286 let msg = Bytes::from(vec![0; MESSAGE_SIZE]);
1289 for mut sender in senders.into_iter().skip(1) {
1290 sender
1291 .send(Recipients::One(peers[0].clone()), msg.clone(), true)
1292 .await
1293 .unwrap();
1294 }
1295
1296 let mut received_from = HashSet::new();
1298 for _ in 1..=NUM_PEERS {
1299 let (origin, received) = receivers[0].recv().await.unwrap();
1300 assert_eq!(received.len(), MESSAGE_SIZE);
1301 assert!(
1302 received_from.insert(origin.clone()),
1303 "Received duplicate from {origin:?}"
1304 );
1305 }
1306
1307 let elapsed = context.current().duration_since(start).unwrap();
1308
1309 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1311
1312 assert!(
1313 elapsed >= Duration::from_millis(expected_ms as u64),
1314 "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1315 );
1316 assert!(
1317 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1318 "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1319 );
1320
1321 assert_eq!(received_from.len(), NUM_PEERS);
1323 for peer in peers.iter().skip(1) {
1324 assert!(received_from.contains(peer));
1325 }
1326 });
1327 }
1328
1329 #[test]
1330 fn test_message_ordering() {
1331 let executor = deterministic::Runner::default();
1333 executor.start(|context| async move {
1334 let (network, mut oracle) = Network::new(
1335 context.with_label("network"),
1336 Config {
1337 max_size: 1024 * 1024,
1338 disconnect_on_block: true,
1339 tracked_peer_sets: None,
1340 },
1341 );
1342 network.start();
1343
1344 let pk1 = PrivateKey::from_seed(1).public_key();
1346 let pk2 = PrivateKey::from_seed(2).public_key();
1347 let (mut sender, _) = oracle
1348 .control(pk1.clone())
1349 .register(0, TEST_QUOTA)
1350 .await
1351 .unwrap();
1352 let (_, mut receiver) = oracle
1353 .control(pk2.clone())
1354 .register(0, TEST_QUOTA)
1355 .await
1356 .unwrap();
1357
1358 oracle
1360 .add_link(
1361 pk1.clone(),
1362 pk2.clone(),
1363 Link {
1364 latency: Duration::from_millis(50),
1365 jitter: Duration::from_millis(40),
1366 success_rate: 1.0,
1367 },
1368 )
1369 .await
1370 .unwrap();
1371
1372 let messages = vec![
1374 Bytes::from("message 1"),
1375 Bytes::from("message 2"),
1376 Bytes::from("message 3"),
1377 Bytes::from("message 4"),
1378 Bytes::from("message 5"),
1379 ];
1380
1381 for msg in messages.clone() {
1382 sender
1383 .send(Recipients::One(pk2.clone()), msg, true)
1384 .await
1385 .unwrap();
1386 }
1387
1388 for expected_msg in messages {
1390 let (origin, received_msg) = receiver.recv().await.unwrap();
1391 assert_eq!(origin, pk1);
1392 assert_eq!(received_msg, expected_msg);
1393 }
1394 })
1395 }
1396
1397 #[test]
1398 fn test_high_latency_message_blocks_followup() {
1399 let executor = deterministic::Runner::default();
1400 executor.start(|context| async move {
1401 let (network, mut oracle) = Network::new(
1402 context.with_label("network"),
1403 Config {
1404 max_size: 1024 * 1024,
1405 disconnect_on_block: true,
1406 tracked_peer_sets: None,
1407 },
1408 );
1409 network.start();
1410
1411 let pk1 = PrivateKey::from_seed(1).public_key();
1412 let pk2 = PrivateKey::from_seed(2).public_key();
1413 let (mut sender, _) = oracle.control(pk1.clone()).register(0, TEST_QUOTA).await.unwrap();
1414 let (_, mut receiver) = oracle.control(pk2.clone()).register(0, TEST_QUOTA).await.unwrap();
1415
1416 const BPS: usize = 1_000;
1417 oracle
1418 .limit_bandwidth(pk1.clone(), Some(BPS), None)
1419 .await
1420 .unwrap();
1421 oracle
1422 .limit_bandwidth(pk2.clone(), None, Some(BPS))
1423 .await
1424 .unwrap();
1425
1426 oracle
1428 .add_link(
1429 pk1.clone(),
1430 pk2.clone(),
1431 Link {
1432 latency: Duration::from_millis(5_000),
1433 jitter: Duration::ZERO,
1434 success_rate: 1.0,
1435 },
1436 )
1437 .await
1438 .unwrap();
1439
1440 let slow = Bytes::from(vec![0u8; 1_000]);
1441 sender
1442 .send(Recipients::One(pk2.clone()), slow.clone(), true)
1443 .await
1444 .unwrap();
1445
1446 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
1448 oracle
1449 .add_link(
1450 pk1.clone(),
1451 pk2.clone(),
1452 Link {
1453 latency: Duration::from_millis(1),
1454 jitter: Duration::ZERO,
1455 success_rate: 1.0,
1456 },
1457 )
1458 .await
1459 .unwrap();
1460
1461 let fast = Bytes::from(vec![1u8; 1_000]);
1463 sender
1464 .send(Recipients::One(pk2.clone()), fast.clone(), true)
1465 .await
1466 .unwrap();
1467
1468 let start = context.current();
1469 let (origin1, message1) = receiver.recv().await.unwrap();
1470 assert_eq!(origin1, pk1);
1471 assert_eq!(message1, slow);
1472 let first_elapsed = context.current().duration_since(start).unwrap();
1473
1474 let (origin2, message2) = receiver.recv().await.unwrap();
1475 let second_elapsed = context.current().duration_since(start).unwrap();
1476 assert_eq!(origin2, pk1);
1477 assert_eq!(message2, fast);
1478
1479 let egress_time = Duration::from_secs(1);
1480 let slow_latency = Duration::from_millis(5_000);
1481 let expected_first = egress_time + slow_latency;
1482 let tolerance = Duration::from_millis(10);
1483 assert!(
1484 first_elapsed >= expected_first.saturating_sub(tolerance)
1485 && first_elapsed <= expected_first + tolerance,
1486 "slow message arrived outside expected window: {first_elapsed:?} (expected {expected_first:?} ± {tolerance:?})"
1487 );
1488 assert!(
1489 second_elapsed >= first_elapsed,
1490 "fast message arrived before slow transmission completed"
1491 );
1492
1493 let arrival_gap = second_elapsed
1494 .checked_sub(first_elapsed)
1495 .expect("timestamps ordered");
1496 assert!(
1497 arrival_gap >= egress_time.saturating_sub(tolerance)
1498 && arrival_gap <= egress_time + tolerance,
1499 "next arrival deviated from transmit duration (gap = {arrival_gap:?}, expected {egress_time:?} ± {tolerance:?})"
1500 );
1501 })
1502 }
1503
1504 #[test]
1505 fn test_many_to_one_bandwidth_sharing() {
1506 let executor = deterministic::Runner::default();
1507 executor.start(|context| async move {
1508 let (network, mut oracle) = Network::new(
1509 context.with_label("network"),
1510 Config {
1511 max_size: 1024 * 1024,
1512 disconnect_on_block: true,
1513 tracked_peer_sets: None,
1514 },
1515 );
1516 network.start();
1517
1518 let mut senders = Vec::new();
1520 let mut sender_txs = Vec::new();
1521 for i in 0..10 {
1522 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1523 senders.push(sender.clone());
1524 let (tx, _) = oracle
1525 .control(sender.clone())
1526 .register(0, TEST_QUOTA)
1527 .await
1528 .unwrap();
1529 sender_txs.push(tx);
1530
1531 oracle
1533 .limit_bandwidth(sender.clone(), Some(10_000), None)
1534 .await
1535 .unwrap();
1536 }
1537
1538 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1539 let (_, mut receiver_rx) = oracle
1540 .control(receiver.clone())
1541 .register(0, TEST_QUOTA)
1542 .await
1543 .unwrap();
1544
1545 oracle
1547 .limit_bandwidth(receiver.clone(), None, Some(100_000))
1548 .await
1549 .unwrap();
1550
1551 for sender in &senders {
1553 oracle
1554 .add_link(
1555 sender.clone(),
1556 receiver.clone(),
1557 Link {
1558 latency: Duration::ZERO,
1559 jitter: Duration::ZERO,
1560 success_rate: 1.0,
1561 },
1562 )
1563 .await
1564 .unwrap();
1565 }
1566
1567 let start = context.current();
1568
1569 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1571 let receiver_clone = receiver.clone();
1572 let msg = Bytes::from(vec![i as u8; 10_000]);
1573 tx.send(Recipients::One(receiver_clone), msg, true)
1574 .await
1575 .unwrap();
1576 }
1577
1578 for i in 0..10 {
1581 let (_, _msg) = receiver_rx.recv().await.unwrap();
1582 let recv_time = context.current().duration_since(start).unwrap();
1583
1584 assert!(
1586 recv_time >= Duration::from_millis(950)
1587 && recv_time <= Duration::from_millis(1100),
1588 "Message {i} received at {recv_time:?}, expected ~1s",
1589 );
1590 }
1591 });
1592 }
1593
1594 #[test]
1595 fn test_one_to_many_fast_sender() {
1596 let executor = deterministic::Runner::default();
1599 executor.start(|context| async move {
1600 let (network, mut oracle) = Network::new(
1601 context.with_label("network"),
1602 Config {
1603 max_size: 1024 * 1024,
1604 disconnect_on_block: true,
1605 tracked_peer_sets: None,
1606 },
1607 );
1608 network.start();
1609
1610 let sender = ed25519::PrivateKey::from_seed(0).public_key();
1612 let (sender_tx, _) = oracle
1613 .control(sender.clone())
1614 .register(0, TEST_QUOTA)
1615 .await
1616 .unwrap();
1617
1618 oracle
1620 .limit_bandwidth(sender.clone(), Some(100_000), None)
1621 .await
1622 .unwrap();
1623
1624 let mut receivers = Vec::new();
1626 let mut receiver_rxs = Vec::new();
1627 for i in 0..10 {
1628 let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1629 receivers.push(receiver.clone());
1630 let (_, rx) = oracle
1631 .control(receiver.clone())
1632 .register(0, TEST_QUOTA)
1633 .await
1634 .unwrap();
1635 receiver_rxs.push(rx);
1636
1637 oracle
1639 .limit_bandwidth(receiver.clone(), None, Some(10_000))
1640 .await
1641 .unwrap();
1642
1643 oracle
1645 .add_link(
1646 sender.clone(),
1647 receiver.clone(),
1648 Link {
1649 latency: Duration::ZERO,
1650 jitter: Duration::ZERO,
1651 success_rate: 1.0,
1652 },
1653 )
1654 .await
1655 .unwrap();
1656 }
1657
1658 let start = context.current();
1659
1660 for (i, receiver) in receivers.iter().enumerate() {
1662 let mut sender_tx = sender_tx.clone();
1663 let receiver_clone = receiver.clone();
1664 let msg = Bytes::from(vec![i as u8; 10_000]);
1665 sender_tx
1666 .send(Recipients::One(receiver_clone), msg, true)
1667 .await
1668 .unwrap();
1669 }
1670
1671 for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1673 let (_, msg) = rx.recv().await.unwrap();
1674 assert_eq!(msg[0], i as u8);
1675 let recv_time = context.current().duration_since(start).unwrap();
1676
1677 assert!(
1679 recv_time >= Duration::from_millis(950)
1680 && recv_time <= Duration::from_millis(1100),
1681 "Receiver {i} received at {recv_time:?}, expected ~1s",
1682 );
1683 }
1684 });
1685 }
1686
1687 #[test]
1688 fn test_many_slow_senders_to_fast_receiver() {
1689 let executor = deterministic::Runner::default();
1692 executor.start(|context| async move {
1693 let (network, mut oracle) = Network::new(
1694 context.with_label("network"),
1695 Config {
1696 max_size: 1024 * 1024,
1697 disconnect_on_block: true,
1698 tracked_peer_sets: None,
1699 },
1700 );
1701 network.start();
1702
1703 let mut senders = Vec::new();
1705 let mut sender_txs = Vec::new();
1706 for i in 0..10 {
1707 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1708 senders.push(sender.clone());
1709 let (tx, _) = oracle
1710 .control(sender.clone())
1711 .register(0, TEST_QUOTA)
1712 .await
1713 .unwrap();
1714 sender_txs.push(tx);
1715
1716 oracle
1718 .limit_bandwidth(sender.clone(), Some(1_000), None)
1719 .await
1720 .unwrap();
1721 }
1722
1723 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1725 let (_, mut receiver_rx) = oracle
1726 .control(receiver.clone())
1727 .register(0, TEST_QUOTA)
1728 .await
1729 .unwrap();
1730
1731 oracle
1733 .limit_bandwidth(receiver.clone(), None, Some(10_000))
1734 .await
1735 .unwrap();
1736
1737 for sender in &senders {
1739 oracle
1740 .add_link(
1741 sender.clone(),
1742 receiver.clone(),
1743 Link {
1744 latency: Duration::ZERO,
1745 jitter: Duration::ZERO,
1746 success_rate: 1.0,
1747 },
1748 )
1749 .await
1750 .unwrap();
1751 }
1752
1753 let start = context.current();
1754
1755 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1757 let receiver_clone = receiver.clone();
1758 let msg = Bytes::from(vec![i as u8; 1_000]);
1759 tx.send(Recipients::One(receiver_clone), msg, true)
1760 .await
1761 .unwrap();
1762 }
1763
1764 for i in 0..10 {
1770 let (_, _msg) = receiver_rx.recv().await.unwrap();
1771 let recv_time = context.current().duration_since(start).unwrap();
1772
1773 assert!(
1775 recv_time >= Duration::from_millis(950)
1776 && recv_time <= Duration::from_millis(1100),
1777 "Message {i} received at {recv_time:?}, expected ~1s",
1778 );
1779 }
1780 });
1781 }
1782
1783 #[test]
1784 fn test_dynamic_bandwidth_allocation_staggered() {
1785 let executor = deterministic::Runner::default();
1791 executor.start(|context| async move {
1792 let (network, mut oracle) = Network::new(
1793 context.with_label("network"),
1794 Config {
1795 max_size: 1024 * 1024,
1796 disconnect_on_block: true,
1797 tracked_peer_sets: None,
1798 },
1799 );
1800 network.start();
1801
1802 let mut senders = Vec::new();
1804 let mut sender_txs = Vec::new();
1805 for i in 0..3 {
1806 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1807 senders.push(sender.clone());
1808 let (tx, _) = oracle
1809 .control(sender.clone())
1810 .register(0, TEST_QUOTA)
1811 .await
1812 .unwrap();
1813 sender_txs.push(tx);
1814
1815 oracle
1817 .limit_bandwidth(sender.clone(), Some(30_000), None)
1818 .await
1819 .unwrap();
1820 }
1821
1822 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1824 let (_, mut receiver_rx) = oracle
1825 .control(receiver.clone())
1826 .register(0, TEST_QUOTA)
1827 .await
1828 .unwrap();
1829 oracle
1830 .limit_bandwidth(receiver.clone(), None, Some(30_000))
1831 .await
1832 .unwrap();
1833
1834 for sender in &senders {
1836 oracle
1837 .add_link(
1838 sender.clone(),
1839 receiver.clone(),
1840 Link {
1841 latency: Duration::from_millis(1),
1842 jitter: Duration::ZERO,
1843 success_rate: 1.0,
1844 },
1845 )
1846 .await
1847 .unwrap();
1848 }
1849
1850 let start = context.current();
1851
1852 let mut tx0 = sender_txs[0].clone();
1856 let rx_clone = receiver.clone();
1857 context.clone().spawn(move |_| async move {
1858 let msg = Bytes::from(vec![0u8; 30_000]);
1859 tx0.send(Recipients::One(rx_clone), msg, true)
1860 .await
1861 .unwrap();
1862 });
1863
1864 let mut tx1 = sender_txs[1].clone();
1868 let rx_clone = receiver.clone();
1869 context.clone().spawn(move |context| async move {
1870 context.sleep(Duration::from_millis(500)).await;
1871 let msg = Bytes::from(vec![1u8; 30_000]);
1872 tx1.send(Recipients::One(rx_clone), msg, true)
1873 .await
1874 .unwrap();
1875 });
1876
1877 let mut tx2 = sender_txs[2].clone();
1880 let rx_clone = receiver.clone();
1881 context.clone().spawn(move |context| async move {
1882 context.sleep(Duration::from_millis(1500)).await;
1883 let msg = Bytes::from(vec![2u8; 15_000]);
1884 tx2.send(Recipients::One(rx_clone), msg, true)
1885 .await
1886 .unwrap();
1887 });
1888
1889 let (_, msg0) = receiver_rx.recv().await.unwrap();
1893 assert_eq!(msg0[0], 0);
1894 let t0 = context.current().duration_since(start).unwrap();
1895 assert!(
1896 t0 >= Duration::from_millis(1490) && t0 <= Duration::from_millis(1600),
1897 "Message 0 received at {t0:?}, expected ~1.5s",
1898 );
1899
1900 let (_, msg_a) = receiver_rx.recv().await.unwrap();
1904 let t_a = context.current().duration_since(start).unwrap();
1905
1906 let (_, msg_b) = receiver_rx.recv().await.unwrap();
1907 let t_b = context.current().duration_since(start).unwrap();
1908
1909 let (msg1, t1, msg2, t2) = if msg_a[0] == 1 {
1911 (msg_a, t_a, msg_b, t_b)
1912 } else {
1913 (msg_b, t_b, msg_a, t_a)
1914 };
1915
1916 assert_eq!(msg1[0], 1);
1917 assert_eq!(msg2[0], 2);
1918
1919 assert!(
1924 t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1925 "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1926 );
1927
1928 assert!(
1929 t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1930 "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1931 );
1932 });
1933 }
1934
1935 #[test]
1936 fn test_dynamic_bandwidth_varied_sizes() {
1937 let executor = deterministic::Runner::default();
1940 executor.start(|context| async move {
1941 let (network, mut oracle) = Network::new(
1942 context.with_label("network"),
1943 Config {
1944 max_size: 1024 * 1024,
1945 disconnect_on_block: true,
1946 tracked_peer_sets: None,
1947 },
1948 );
1949 network.start();
1950
1951 let mut senders = Vec::new();
1953 let mut sender_txs = Vec::new();
1954 for i in 0..3 {
1955 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1956 senders.push(sender.clone());
1957 let (tx, _) = oracle
1958 .control(sender.clone())
1959 .register(0, TEST_QUOTA)
1960 .await
1961 .unwrap();
1962 sender_txs.push(tx);
1963
1964 oracle
1966 .limit_bandwidth(sender.clone(), None, None)
1967 .await
1968 .unwrap();
1969 }
1970
1971 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1973 let (_, mut receiver_rx) = oracle
1974 .control(receiver.clone())
1975 .register(0, TEST_QUOTA)
1976 .await
1977 .unwrap();
1978 oracle
1979 .limit_bandwidth(receiver.clone(), None, Some(30_000))
1980 .await
1981 .unwrap();
1982
1983 for sender in &senders {
1985 oracle
1986 .add_link(
1987 sender.clone(),
1988 receiver.clone(),
1989 Link {
1990 latency: Duration::from_millis(1),
1991 jitter: Duration::ZERO,
1992 success_rate: 1.0,
1993 },
1994 )
1995 .await
1996 .unwrap();
1997 }
1998
1999 let start = context.current();
2000
2001 let sizes = [10_000, 20_000, 30_000];
2007 for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
2008 let rx_clone = receiver.clone();
2009 let msg_size = *size;
2010 let msg = Bytes::from(vec![i as u8; msg_size]);
2011 tx.send(Recipients::One(rx_clone), msg, true).await.unwrap();
2012 }
2013
2014 let mut messages = Vec::new();
2018 for _ in 0..3 {
2019 let (_, msg) = receiver_rx.recv().await.unwrap();
2020 let t = context.current().duration_since(start).unwrap();
2021 messages.push((msg[0] as usize, msg.len(), t));
2022 }
2023
2024 assert_eq!(messages.len(), 3);
2029
2030 let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
2032 assert!(
2033 max_time >= Duration::from_millis(2000),
2034 "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
2035 );
2036 });
2037 }
2038
2039 #[test]
2040 fn test_bandwidth_pipe_reservation_duration() {
2041 let executor = deterministic::Runner::default();
2044 executor.start(|context| async move {
2045 let (network, mut oracle) = Network::new(
2046 context.with_label("network"),
2047 Config {
2048 max_size: 1024 * 1024,
2049 disconnect_on_block: true,
2050 tracked_peer_sets: None,
2051 },
2052 );
2053 network.start();
2054
2055 let sender = PrivateKey::from_seed(1).public_key();
2057 let receiver = PrivateKey::from_seed(2).public_key();
2058
2059 let (sender_tx, _) = oracle
2060 .control(sender.clone())
2061 .register(0, TEST_QUOTA)
2062 .await
2063 .unwrap();
2064 let (_, mut receiver_rx) = oracle
2065 .control(receiver.clone())
2066 .register(0, TEST_QUOTA)
2067 .await
2068 .unwrap();
2069
2070 oracle
2072 .limit_bandwidth(sender.clone(), Some(1000), None)
2073 .await
2074 .unwrap();
2075 oracle
2076 .limit_bandwidth(receiver.clone(), None, Some(1000))
2077 .await
2078 .unwrap();
2079
2080 oracle
2082 .add_link(
2083 sender.clone(),
2084 receiver.clone(),
2085 Link {
2086 latency: Duration::from_secs(1), jitter: Duration::ZERO,
2088 success_rate: 1.0,
2089 },
2090 )
2091 .await
2092 .unwrap();
2093
2094 let start = context.current();
2105
2106 for i in 0..3 {
2108 let mut sender_tx = sender_tx.clone();
2109 let receiver = receiver.clone();
2110 let msg = Bytes::from(vec![i; 500]);
2111 sender_tx
2112 .send(Recipients::One(receiver), msg, false)
2113 .await
2114 .unwrap();
2115 }
2116
2117 let mut receive_times = Vec::new();
2119 for i in 0..3 {
2120 let (_, received) = receiver_rx.recv().await.unwrap();
2121 receive_times.push(context.current().duration_since(start).unwrap());
2122 assert_eq!(received[0], i);
2123 }
2124
2125 for (i, time) in receive_times.iter().enumerate() {
2130 let expected_min = (i as u64 * 500) + 1500;
2131 let expected_max = expected_min + 100;
2132
2133 assert!(
2134 *time >= Duration::from_millis(expected_min)
2135 && *time < Duration::from_millis(expected_max),
2136 "Message {} should arrive at ~{}ms, got {:?}",
2137 i + 1,
2138 expected_min,
2139 time
2140 );
2141 }
2142 });
2143 }
2144
2145 #[test]
2146 fn test_dynamic_bandwidth_affects_new_transfers() {
2147 let executor = deterministic::Runner::default();
2150 executor.start(|context| async move {
2151 let (network, mut oracle) = Network::new(
2152 context.with_label("network"),
2153 Config {
2154 max_size: 1024 * 1024,
2155 disconnect_on_block: true,
2156 tracked_peer_sets: None,
2157 },
2158 );
2159 network.start();
2160
2161 let pk_sender = PrivateKey::from_seed(1).public_key();
2162 let pk_receiver = PrivateKey::from_seed(2).public_key();
2163
2164 let (mut sender_tx, _) = oracle
2166 .control(pk_sender.clone())
2167 .register(0, TEST_QUOTA)
2168 .await
2169 .unwrap();
2170 let (_, mut receiver_rx) = oracle
2171 .control(pk_receiver.clone())
2172 .register(0, TEST_QUOTA)
2173 .await
2174 .unwrap();
2175 oracle
2176 .add_link(
2177 pk_sender.clone(),
2178 pk_receiver.clone(),
2179 Link {
2180 latency: Duration::from_millis(1), jitter: Duration::ZERO,
2182 success_rate: 1.0,
2183 },
2184 )
2185 .await
2186 .unwrap();
2187
2188 oracle
2190 .limit_bandwidth(pk_sender.clone(), Some(10_000), None)
2191 .await
2192 .unwrap();
2193 oracle
2194 .limit_bandwidth(pk_receiver.clone(), None, Some(10_000))
2195 .await
2196 .unwrap();
2197
2198 let msg1 = Bytes::from(vec![1u8; 20_000]); let start_time = context.current();
2201 sender_tx
2202 .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2203 .await
2204 .unwrap();
2205
2206 let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
2208 let msg1_time = context.current().duration_since(start_time).unwrap();
2209 assert_eq!(received_msg1.len(), 20_000);
2210 assert!(
2211 msg1_time >= Duration::from_millis(1999)
2212 && msg1_time <= Duration::from_millis(2010),
2213 "First message should take ~2s, got {msg1_time:?}",
2214 );
2215
2216 oracle
2218 .limit_bandwidth(pk_sender.clone(), Some(2_000), None)
2219 .await
2220 .unwrap();
2221
2222 let msg2 = Bytes::from(vec![2u8; 10_000]); let msg2_start = context.current();
2225 sender_tx
2226 .send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
2227 .await
2228 .unwrap();
2229
2230 let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
2232 let msg2_time = context.current().duration_since(msg2_start).unwrap();
2233 assert_eq!(received_msg2.len(), 10_000);
2234 assert!(
2235 msg2_time >= Duration::from_millis(4999)
2236 && msg2_time <= Duration::from_millis(5010),
2237 "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
2238 );
2239 });
2240 }
2241
2242 #[test]
2243 fn test_zero_receiver_ingress_bandwidth() {
2244 let executor = deterministic::Runner::default();
2245 executor.start(|context| async move {
2246 let (network, mut oracle) = Network::new(
2247 context.with_label("network"),
2248 Config {
2249 max_size: 1024 * 1024,
2250 disconnect_on_block: true,
2251 tracked_peer_sets: None,
2252 },
2253 );
2254 network.start();
2255
2256 let pk_sender = PrivateKey::from_seed(1).public_key();
2257 let pk_receiver = PrivateKey::from_seed(2).public_key();
2258
2259 let (mut sender_tx, _) = oracle
2261 .control(pk_sender.clone())
2262 .register(0, TEST_QUOTA)
2263 .await
2264 .unwrap();
2265 let (_, mut receiver_rx) = oracle
2266 .control(pk_receiver.clone())
2267 .register(0, TEST_QUOTA)
2268 .await
2269 .unwrap();
2270 oracle
2271 .add_link(
2272 pk_sender.clone(),
2273 pk_receiver.clone(),
2274 Link {
2275 latency: Duration::ZERO,
2276 jitter: Duration::ZERO,
2277 success_rate: 1.0,
2278 },
2279 )
2280 .await
2281 .unwrap();
2282
2283 oracle
2285 .limit_bandwidth(pk_receiver.clone(), None, Some(0))
2286 .await
2287 .unwrap();
2288
2289 let msg1 = Bytes::from(vec![1u8; 20_000]); let sent = sender_tx
2292 .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2293 .await
2294 .unwrap();
2295 assert_eq!(sent.len(), 1);
2296 assert_eq!(sent[0], pk_receiver);
2297
2298 select! {
2300 _ = receiver_rx.recv() => {
2301 panic!("unexpected message");
2302 },
2303 _ = context.sleep(Duration::from_secs(10)) => {},
2304 }
2305
2306 oracle
2308 .limit_bandwidth(pk_receiver.clone(), None, None)
2309 .await
2310 .unwrap();
2311
2312 select! {
2314 _ = receiver_rx.recv() => {},
2315 _ = context.sleep(Duration::from_secs(1)) => {
2316 panic!("timeout");
2317 },
2318 }
2319 });
2320 }
2321
2322 #[test]
2323 fn test_zero_sender_egress_bandwidth() {
2324 let executor = deterministic::Runner::default();
2325 executor.start(|context| async move {
2326 let (network, mut oracle) = Network::new(
2327 context.with_label("network"),
2328 Config {
2329 max_size: 1024 * 1024,
2330 disconnect_on_block: true,
2331 tracked_peer_sets: None,
2332 },
2333 );
2334 network.start();
2335
2336 let pk_sender = PrivateKey::from_seed(1).public_key();
2337 let pk_receiver = PrivateKey::from_seed(2).public_key();
2338
2339 let (mut sender_tx, _) = oracle
2341 .control(pk_sender.clone())
2342 .register(0, TEST_QUOTA)
2343 .await
2344 .unwrap();
2345 let (_, mut receiver_rx) = oracle
2346 .control(pk_receiver.clone())
2347 .register(0, TEST_QUOTA)
2348 .await
2349 .unwrap();
2350 oracle
2351 .add_link(
2352 pk_sender.clone(),
2353 pk_receiver.clone(),
2354 Link {
2355 latency: Duration::ZERO,
2356 jitter: Duration::ZERO,
2357 success_rate: 1.0,
2358 },
2359 )
2360 .await
2361 .unwrap();
2362
2363 oracle
2365 .limit_bandwidth(pk_sender.clone(), Some(0), None)
2366 .await
2367 .unwrap();
2368
2369 let msg1 = Bytes::from(vec![1u8; 20_000]); let sent = sender_tx
2372 .send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
2373 .await
2374 .unwrap();
2375 assert_eq!(sent.len(), 1);
2376 assert_eq!(sent[0], pk_receiver);
2377
2378 select! {
2380 _ = receiver_rx.recv() => {
2381 panic!("unexpected message");
2382 },
2383 _ = context.sleep(Duration::from_secs(10)) => {},
2384 }
2385
2386 oracle
2388 .limit_bandwidth(pk_sender.clone(), None, None)
2389 .await
2390 .unwrap();
2391
2392 select! {
2394 _ = receiver_rx.recv() => {},
2395 _ = context.sleep(Duration::from_secs(1)) => {
2396 panic!("timeout");
2397 },
2398 }
2399 });
2400 }
2401
2402 #[test]
2403 fn register_peer_set() {
2404 let executor = deterministic::Runner::default();
2405 executor.start(|context| async move {
2406 let (network, oracle) = Network::new(
2407 context.with_label("network"),
2408 Config {
2409 max_size: 1024 * 1024,
2410 disconnect_on_block: true,
2411 tracked_peer_sets: Some(3),
2412 },
2413 );
2414 network.start();
2415
2416 let mut manager = oracle.manager();
2417 assert_eq!(manager.peer_set(0).await, Some([].try_into().unwrap()));
2418
2419 let pk1 = PrivateKey::from_seed(1).public_key();
2420 let pk2 = PrivateKey::from_seed(2).public_key();
2421 manager
2422 .update(0xFF, [pk1.clone(), pk2.clone()].try_into().unwrap())
2423 .await;
2424
2425 assert_eq!(
2426 manager.peer_set(0xFF).await.unwrap(),
2427 [pk1, pk2].try_into().unwrap()
2428 );
2429 });
2430 }
2431
2432 #[test]
2433 fn test_socket_manager() {
2434 let executor = deterministic::Runner::default();
2435 executor.start(|context| async move {
2436 let (network, oracle) = Network::new(
2437 context.with_label("network"),
2438 Config {
2439 max_size: 1024 * 1024,
2440 disconnect_on_block: true,
2441 tracked_peer_sets: Some(3),
2442 },
2443 );
2444 network.start();
2445
2446 let pk1 = PrivateKey::from_seed(1).public_key();
2447 let pk2 = PrivateKey::from_seed(2).public_key();
2448 let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2449 let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
2450
2451 let mut manager = oracle.socket_manager();
2452 let peers: Map<_, _> = [(pk1.clone(), addr1.clone()), (pk2.clone(), addr2.clone())]
2453 .try_into()
2454 .unwrap();
2455 manager.update(1, peers).await;
2456
2457 let peer_set = manager.peer_set(1).await.expect("peer set missing");
2458 let keys: Vec<_> = Vec::from(peer_set.clone());
2459 assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2460
2461 let mut subscription = manager.subscribe().await;
2462 let (id, latest, all) = subscription.next().await.unwrap();
2463 assert_eq!(id, 1);
2464 let latest_keys: Vec<_> = Vec::from(latest.clone());
2465 assert_eq!(latest_keys, vec![pk1.clone(), pk2.clone()]);
2466 let all_keys: Vec<_> = Vec::from(all.clone());
2467 assert_eq!(all_keys, vec![pk1.clone(), pk2.clone()]);
2468
2469 let peers: Map<_, _> = [(pk2.clone(), addr2)].try_into().unwrap();
2470 manager.update(2, peers).await;
2471
2472 let (id, latest, all) = subscription.next().await.unwrap();
2473 assert_eq!(id, 2);
2474 let latest_keys: Vec<_> = Vec::from(latest);
2475 assert_eq!(latest_keys, vec![pk2.clone()]);
2476 let all_keys: Vec<_> = Vec::from(all);
2477 assert_eq!(all_keys, vec![pk1, pk2]);
2478 });
2479 }
2480
2481 #[test]
2482 fn test_socket_manager_with_asymmetric_addresses() {
2483 let executor = deterministic::Runner::default();
2484 executor.start(|context| async move {
2485 let (network, oracle) = Network::new(
2486 context.with_label("network"),
2487 Config {
2488 max_size: 1024 * 1024,
2489 disconnect_on_block: true,
2490 tracked_peer_sets: Some(3),
2491 },
2492 );
2493 network.start();
2494
2495 let pk1 = PrivateKey::from_seed(1).public_key();
2496 let pk2 = PrivateKey::from_seed(2).public_key();
2497
2498 let addr1 = Address::Asymmetric {
2500 ingress: Ingress::Socket(SocketAddr::from(([10, 0, 0, 1], 8080))),
2501 egress: SocketAddr::from(([192, 168, 1, 1], 9090)),
2502 };
2503 let addr2 = Address::Asymmetric {
2504 ingress: Ingress::Dns {
2505 host: hostname!("node2.example.com"),
2506 port: 8080,
2507 },
2508 egress: SocketAddr::from(([192, 168, 1, 2], 9090)),
2509 };
2510
2511 let mut manager = oracle.socket_manager();
2512 let peers: Map<_, _> = [(pk1.clone(), addr1), (pk2.clone(), addr2)]
2513 .try_into()
2514 .unwrap();
2515 manager.update(1, peers).await;
2516
2517 let peer_set = manager.peer_set(1).await.expect("peer set missing");
2519 let keys: Vec<_> = Vec::from(peer_set);
2520 assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2521
2522 let mut subscription = manager.subscribe().await;
2524 let (id, latest, _all) = subscription.next().await.unwrap();
2525 assert_eq!(id, 1);
2526 let latest_keys: Vec<_> = Vec::from(latest);
2527 assert_eq!(latest_keys, vec![pk1, pk2]);
2528 });
2529 }
2530
2531 #[test]
2532 fn test_peer_set_window_management() {
2533 let executor = deterministic::Runner::default();
2534 executor.start(|context| async move {
2535 let (network, mut oracle) = Network::new(
2536 context.with_label("network"),
2537 Config {
2538 max_size: 1024 * 1024,
2539 disconnect_on_block: true,
2540 tracked_peer_sets: Some(2), },
2542 );
2543 network.start();
2544
2545 let pk1 = PrivateKey::from_seed(1).public_key();
2547 let pk2 = PrivateKey::from_seed(2).public_key();
2548 let pk3 = PrivateKey::from_seed(3).public_key();
2549 let pk4 = PrivateKey::from_seed(4).public_key();
2550
2551 let mut manager = oracle.manager();
2553 manager
2554 .update(1, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2555 .await;
2556
2557 let (mut sender1, _receiver1) = oracle
2559 .control(pk1.clone())
2560 .register(0, TEST_QUOTA)
2561 .await
2562 .unwrap();
2563 let (mut sender2, _receiver2) = oracle
2564 .control(pk2.clone())
2565 .register(0, TEST_QUOTA)
2566 .await
2567 .unwrap();
2568 let (mut sender3, _receiver3) = oracle
2569 .control(pk3.clone())
2570 .register(0, TEST_QUOTA)
2571 .await
2572 .unwrap();
2573 let (_mut_sender4, _receiver4) = oracle
2574 .control(pk4.clone())
2575 .register(0, TEST_QUOTA)
2576 .await
2577 .unwrap();
2578
2579 for peer_a in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2581 for peer_b in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2582 if peer_a != peer_b {
2583 oracle
2584 .add_link(
2585 peer_a.clone(),
2586 peer_b.clone(),
2587 Link {
2588 latency: Duration::from_millis(1),
2589 jitter: Duration::ZERO,
2590 success_rate: 1.0,
2591 },
2592 )
2593 .await
2594 .unwrap();
2595 }
2596 }
2597 }
2598
2599 let sent = sender1
2601 .send(Recipients::One(pk2.clone()), Bytes::from("msg1"), false)
2602 .await
2603 .unwrap();
2604 assert_eq!(sent.len(), 1);
2605
2606 let sent = sender1
2608 .send(Recipients::One(pk3.clone()), Bytes::from("msg2"), false)
2609 .await
2610 .unwrap();
2611 assert_eq!(sent.len(), 0);
2612
2613 manager
2615 .update(2, vec![pk2.clone(), pk3.clone()].try_into().unwrap())
2616 .await;
2617
2618 let sent = sender1
2620 .send(Recipients::One(pk3.clone()), Bytes::from("msg3"), false)
2621 .await
2622 .unwrap();
2623 assert_eq!(sent.len(), 1);
2624
2625 manager
2627 .update(3, vec![pk3.clone(), pk4.clone()].try_into().unwrap())
2628 .await;
2629
2630 let sent = sender2
2633 .send(Recipients::One(pk1.clone()), Bytes::from("msg4"), false)
2634 .await
2635 .unwrap();
2636 assert_eq!(sent.len(), 0);
2637
2638 let sent = sender2
2640 .send(Recipients::One(pk3.clone()), Bytes::from("msg5"), false)
2641 .await
2642 .unwrap();
2643 assert_eq!(sent.len(), 1);
2644
2645 let sent = sender3
2647 .send(Recipients::One(pk4.clone()), Bytes::from("msg6"), false)
2648 .await
2649 .unwrap();
2650 assert_eq!(sent.len(), 1);
2651
2652 let peer_set_2 = manager.peer_set(2).await.unwrap();
2654 assert!(peer_set_2.as_ref().contains(&pk2));
2655 assert!(peer_set_2.as_ref().contains(&pk3));
2656
2657 let peer_set_3 = manager.peer_set(3).await.unwrap();
2658 assert!(peer_set_3.as_ref().contains(&pk3));
2659 assert!(peer_set_3.as_ref().contains(&pk4));
2660
2661 assert!(manager.peer_set(1).await.is_none());
2663 });
2664 }
2665
2666 #[test]
2667 fn test_sender_removed_from_tracked_peer_set_drops_message() {
2668 let executor = deterministic::Runner::default();
2669 executor.start(|context| async move {
2670 let (network, mut oracle) = Network::new(
2672 context.with_label("network"),
2673 Config {
2674 max_size: 1024 * 1024,
2675 disconnect_on_block: true,
2676 tracked_peer_sets: Some(1),
2677 },
2678 );
2679 network.start();
2680 let mut manager = oracle.manager();
2681 let mut subscription = manager.subscribe().await;
2682
2683 let sender_pk = PrivateKey::from_seed(1).public_key();
2685 let recipient_pk = PrivateKey::from_seed(2).public_key();
2686 manager
2687 .update(
2688 1,
2689 vec![sender_pk.clone(), recipient_pk.clone()]
2690 .try_into()
2691 .unwrap(),
2692 )
2693 .await;
2694 let (id, _, _) = subscription.next().await.unwrap();
2695 assert_eq!(id, 1);
2696
2697 let (mut sender, _) = oracle
2699 .control(sender_pk.clone())
2700 .register(0, TEST_QUOTA)
2701 .await
2702 .unwrap();
2703 let (_sender2, mut receiver) = oracle
2704 .control(recipient_pk.clone())
2705 .register(0, TEST_QUOTA)
2706 .await
2707 .unwrap();
2708
2709 oracle
2711 .add_link(
2712 sender_pk.clone(),
2713 recipient_pk.clone(),
2714 Link {
2715 latency: Duration::from_millis(1),
2716 jitter: Duration::ZERO,
2717 success_rate: 1.0,
2718 },
2719 )
2720 .await
2721 .unwrap();
2722
2723 let initial_msg = Bytes::from("tracked");
2725 let sent = sender
2726 .send(
2727 Recipients::One(recipient_pk.clone()),
2728 initial_msg.clone(),
2729 false,
2730 )
2731 .await
2732 .unwrap();
2733 assert_eq!(sent.len(), 1);
2734 assert_eq!(sent[0], recipient_pk);
2735 let (_pk, received) = receiver.recv().await.unwrap();
2736 assert_eq!(received, initial_msg);
2737
2738 let other_pk = PrivateKey::from_seed(3).public_key();
2740 manager
2741 .update(2, vec![recipient_pk.clone(), other_pk].try_into().unwrap())
2742 .await;
2743 let (id, _, _) = subscription.next().await.unwrap();
2744 assert_eq!(id, 2);
2745
2746 let sent = sender
2748 .send(
2749 Recipients::One(recipient_pk.clone()),
2750 Bytes::from("untracked"),
2751 false,
2752 )
2753 .await
2754 .unwrap();
2755 assert!(sent.is_empty());
2756
2757 select! {
2759 _ = receiver.recv() => {
2760 panic!("unexpected message");
2761 },
2762 _ = context.sleep(Duration::from_secs(10)) => {},
2763 }
2764
2765 manager
2767 .update(
2768 3,
2769 vec![sender_pk.clone(), recipient_pk.clone()]
2770 .try_into()
2771 .unwrap(),
2772 )
2773 .await;
2774 let (id, _, _) = subscription.next().await.unwrap();
2775 assert_eq!(id, 3);
2776
2777 let sent = sender
2779 .send(
2780 Recipients::One(recipient_pk.clone()),
2781 initial_msg.clone(),
2782 false,
2783 )
2784 .await
2785 .unwrap();
2786 assert_eq!(sent.len(), 1);
2787 assert_eq!(sent[0], recipient_pk);
2788 let (_pk, received) = receiver.recv().await.unwrap();
2789 assert_eq!(received, initial_msg);
2790 });
2791 }
2792
2793 #[test]
2794 fn test_subscribe_to_peer_sets() {
2795 let executor = deterministic::Runner::default();
2796 executor.start(|context| async move {
2797 let (network, oracle) = Network::new(
2798 context.with_label("network"),
2799 Config {
2800 max_size: 1024 * 1024,
2801 disconnect_on_block: true,
2802 tracked_peer_sets: Some(2),
2803 },
2804 );
2805 network.start();
2806
2807 let mut manager = oracle.manager();
2809 let mut subscription = manager.subscribe().await;
2810
2811 let pk1 = PrivateKey::from_seed(1).public_key();
2813 let pk2 = PrivateKey::from_seed(2).public_key();
2814 let pk3 = PrivateKey::from_seed(3).public_key();
2815
2816 manager
2818 .update(1, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2819 .await;
2820
2821 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2823 assert_eq!(peer_set_id, 1);
2824 assert_eq!(peer_set, vec![pk1.clone(), pk2.clone()].try_into().unwrap());
2825 assert_eq!(all, vec![pk1.clone(), pk2.clone()].try_into().unwrap());
2826
2827 manager
2829 .update(2, vec![pk2.clone(), pk3.clone()].try_into().unwrap())
2830 .await;
2831
2832 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2834 assert_eq!(peer_set_id, 2);
2835 assert_eq!(peer_set, vec![pk2.clone(), pk3.clone()].try_into().unwrap());
2836 assert_eq!(
2837 all,
2838 vec![pk1.clone(), pk2.clone(), pk3.clone()]
2839 .try_into()
2840 .unwrap()
2841 );
2842
2843 manager
2845 .update(3, vec![pk1.clone(), pk3.clone()].try_into().unwrap())
2846 .await;
2847
2848 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2850 assert_eq!(peer_set_id, 3);
2851 assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].try_into().unwrap());
2852 assert_eq!(
2853 all,
2854 vec![pk1.clone(), pk2.clone(), pk3.clone()]
2855 .try_into()
2856 .unwrap()
2857 );
2858
2859 manager
2861 .update(4, vec![pk1.clone(), pk3.clone()].try_into().unwrap())
2862 .await;
2863
2864 let (peer_set_id, peer_set, all) = subscription.next().await.unwrap();
2866 assert_eq!(peer_set_id, 4);
2867 assert_eq!(peer_set, vec![pk1.clone(), pk3.clone()].try_into().unwrap());
2868 assert_eq!(all, vec![pk1.clone(), pk3.clone()].try_into().unwrap());
2869 });
2870 }
2871
2872 #[test]
2873 fn test_multiple_subscriptions() {
2874 let executor = deterministic::Runner::default();
2875 executor.start(|context| async move {
2876 let (network, oracle) = Network::new(
2877 context.with_label("network"),
2878 Config {
2879 max_size: 1024 * 1024,
2880 disconnect_on_block: true,
2881 tracked_peer_sets: Some(3),
2882 },
2883 );
2884 network.start();
2885
2886 let mut manager = oracle.manager();
2888 let mut subscription1 = manager.subscribe().await;
2889 let mut subscription2 = manager.subscribe().await;
2890 let mut subscription3 = manager.subscribe().await;
2891
2892 let pk1 = PrivateKey::from_seed(1).public_key();
2894 let pk2 = PrivateKey::from_seed(2).public_key();
2895
2896 manager
2898 .update(1, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2899 .await;
2900
2901 let (id1, _, _) = subscription1.next().await.unwrap();
2903 let (id2, _, _) = subscription2.next().await.unwrap();
2904 let (id3, _, _) = subscription3.next().await.unwrap();
2905
2906 assert_eq!(id1, 1);
2907 assert_eq!(id2, 1);
2908 assert_eq!(id3, 1);
2909
2910 drop(subscription2);
2912
2913 manager
2915 .update(2, vec![pk1.clone(), pk2.clone()].try_into().unwrap())
2916 .await;
2917
2918 let (id1, _, _) = subscription1.next().await.unwrap();
2920 let (id3, _, _) = subscription3.next().await.unwrap();
2921
2922 assert_eq!(id1, 2);
2923 assert_eq!(id3, 2);
2924 });
2925 }
2926
2927 #[test]
2928 fn test_subscription_includes_self_when_registered() {
2929 let executor = deterministic::Runner::default();
2930 executor.start(|context| async move {
2931 let (network, oracle) = Network::new(
2932 context.with_label("network"),
2933 Config {
2934 max_size: 1024 * 1024,
2935 disconnect_on_block: true,
2936 tracked_peer_sets: Some(2),
2937 },
2938 );
2939 network.start();
2940
2941 let self_pk = PrivateKey::from_seed(0).public_key();
2943 let other_pk = PrivateKey::from_seed(1).public_key();
2944
2945 let (_sender, _receiver) = oracle
2947 .control(self_pk.clone())
2948 .register(0, TEST_QUOTA)
2949 .await
2950 .unwrap();
2951
2952 let mut manager = oracle.manager();
2954 let mut subscription = manager.subscribe().await;
2955
2956 manager
2958 .update(1, vec![other_pk.clone()].try_into().unwrap())
2959 .await;
2960
2961 let (id, new, all) = subscription.next().await.unwrap();
2963 assert_eq!(id, 1);
2964 assert_eq!(new.len(), 1);
2965 assert_eq!(all.len(), 1);
2966
2967 assert!(
2969 new.position(&self_pk).is_none(),
2970 "new set should not include self"
2971 );
2972 assert!(
2973 new.position(&other_pk).is_some(),
2974 "new set should include other"
2975 );
2976
2977 assert!(
2979 all.position(&self_pk).is_none(),
2980 "tracked peers should not include self"
2981 );
2982 assert!(
2983 all.position(&other_pk).is_some(),
2984 "tracked peers should include other"
2985 );
2986
2987 manager
2989 .update(
2990 2,
2991 vec![self_pk.clone(), other_pk.clone()].try_into().unwrap(),
2992 )
2993 .await;
2994
2995 let (id, new, all) = subscription.next().await.unwrap();
2996 assert_eq!(id, 2);
2997 assert_eq!(new.len(), 2);
2998 assert_eq!(all.len(), 2);
2999
3000 assert!(
3002 new.position(&self_pk).is_some(),
3003 "new set should include self"
3004 );
3005 assert!(
3006 new.position(&other_pk).is_some(),
3007 "new set should include other"
3008 );
3009
3010 assert!(
3012 all.position(&self_pk).is_some(),
3013 "tracked peers should include self"
3014 );
3015 assert!(
3016 all.position(&other_pk).is_some(),
3017 "tracked peers should include other"
3018 );
3019 });
3020 }
3021
3022 #[test]
3023 fn test_rate_limiting() {
3024 let executor = deterministic::Runner::default();
3025 executor.start(|context| async move {
3026 let cfg = Config {
3027 max_size: 1024 * 1024,
3028 disconnect_on_block: true,
3029 tracked_peer_sets: Some(3),
3030 };
3031 let network_context = context.with_label("network");
3032 let (network, mut oracle) = Network::new(network_context.clone(), cfg);
3033 network.start();
3034
3035 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3037 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3038
3039 let mut manager = oracle.manager();
3041 manager
3042 .update(0, [pk1.clone(), pk2.clone()].try_into().unwrap())
3043 .await;
3044
3045 let restrictive_quota = Quota::per_second(NZU32!(1));
3047 let mut control1 = oracle.control(pk1.clone());
3048 let (mut sender, _) = control1.register(0, restrictive_quota).await.unwrap();
3049 let mut control2 = oracle.control(pk2.clone());
3050 let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
3051
3052 let link = ingress::Link {
3054 latency: Duration::from_millis(0),
3055 jitter: Duration::from_millis(0),
3056 success_rate: 1.0,
3057 };
3058 oracle
3059 .add_link(pk1.clone(), pk2.clone(), link.clone())
3060 .await
3061 .unwrap();
3062 oracle.add_link(pk2.clone(), pk1, link).await.unwrap();
3063
3064 let msg1 = Bytes::from_static(b"message1");
3066 let result1 = sender
3067 .send(Recipients::One(pk2.clone()), msg1.clone(), false)
3068 .await
3069 .unwrap();
3070 assert_eq!(result1.len(), 1, "first message should be sent");
3071
3072 let (_, received1) = receiver.recv().await.unwrap();
3074 assert_eq!(received1, msg1);
3075
3076 let msg2 = Bytes::from_static(b"message2");
3078 let result2 = sender
3079 .send(Recipients::One(pk2.clone()), msg2.clone(), false)
3080 .await
3081 .unwrap();
3082 assert_eq!(
3083 result2.len(),
3084 0,
3085 "second message should be rate-limited (skipped)"
3086 );
3087
3088 context.sleep(Duration::from_secs(1)).await;
3090
3091 let msg3 = Bytes::from_static(b"message3");
3093 let result3 = sender
3094 .send(Recipients::One(pk2.clone()), msg3.clone(), false)
3095 .await
3096 .unwrap();
3097 assert_eq!(result3.len(), 1, "third message should be sent after wait");
3098
3099 let (_, received3) = receiver.recv().await.unwrap();
3101 assert_eq!(received3, msg3);
3102 });
3103 }
3104}