1mod bandwidth;
142mod ingress;
143mod metrics;
144mod network;
145mod transmitter;
146
147use thiserror::Error;
148
149#[derive(Debug, Error)]
151pub enum Error {
152 #[error("network closed")]
153 NetworkClosed,
154 #[error("not valid to link self")]
155 LinkingSelf,
156 #[error("link already exists")]
157 LinkExists,
158 #[error("link missing")]
159 LinkMissing,
160 #[error("invalid success rate (must be in [0, 1]): {0}")]
161 InvalidSuccessRate(f64),
162 #[error("send_frame failed")]
163 SendFrameFailed,
164 #[error("recv_frame failed")]
165 RecvFrameFailed,
166 #[error("bind failed")]
167 BindFailed,
168 #[error("accept failed")]
169 AcceptFailed,
170 #[error("dial failed")]
171 DialFailed,
172 #[error("peer missing")]
173 PeerMissing,
174}
175
176pub use ingress::{Control, Link, Manager, Oracle, SocketManager};
177pub use network::{
178 Config, ConnectedPeerProvider, Network, Receiver, Sender, SplitForwarder, SplitOrigin,
179 SplitRouter, SplitSender, SplitTarget, UnlimitedSender,
180};
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::{
186 Address, AddressableManager, AddressableTrackedPeers, CheckedSender as _, Ingress,
187 LimitedSender as _, Manager, Provider, Receiver, Recipients, Sender, TrackedPeers,
188 };
189 use commonware_cryptography::{
190 ed25519::{self, PrivateKey, PublicKey},
191 Signer as _,
192 };
193 use commonware_macros::{select, test_group};
194 use commonware_runtime::{
195 deterministic, reschedule, telemetry::metrics::count_running_tasks, Clock, IoBuf, Quota,
196 Runner, Spawner, Supervisor as _,
197 };
198 use commonware_utils::{
199 channel::mpsc,
200 hostname, ordered,
201 ordered::{Map, Set},
202 NZUsize, NZU32,
203 };
204 use rand::Rng;
205 use std::{
206 collections::{BTreeMap, HashMap, HashSet},
207 net::SocketAddr,
208 num::NonZeroU32,
209 time::Duration,
210 };
211
212 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
214
215 async fn track_peers<I>(oracle: &Oracle<PublicKey, deterministic::Context>, peers: I)
216 where
217 I: IntoIterator<Item = PublicKey>,
218 {
219 let mut manager = oracle.manager();
220 manager.track(0, Set::from_iter_dedup(peers));
221 assert!(manager.peer_set(0).await.is_some());
222 }
223
224 async fn wait_for_task_count(
225 context: &deterministic::Context,
226 prefix: &str,
227 expected: impl Fn(usize) -> bool,
228 ) {
229 loop {
230 let count = count_running_tasks(context, prefix);
231 if expected(count) {
232 return;
233 }
234 reschedule().await;
235 }
236 }
237
238 fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
239 let executor = deterministic::Runner::seeded(seed);
240 executor.start(|context| async move {
241 let (network, oracle) = Network::new(
243 context.child("network"),
244 Config {
245 max_size: 1024 * 1024,
246 disconnect_on_block: true,
247 tracked_peer_sets: NZUsize!(1),
248 },
249 );
250
251 network.start();
253
254 let mut agents = BTreeMap::new();
256 let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
257 for i in 0..size {
258 let pk = PrivateKey::from_seed(i as u64).public_key();
259 let (sender, mut receiver) = oracle
260 .control(pk.clone())
261 .register(0, TEST_QUOTA)
262 .await
263 .unwrap();
264 agents.insert(pk, sender);
265 let agent_sender = seen_sender.clone();
266 context.child("agent_receiver").spawn(move |_| async move {
267 for _ in 0..size {
268 receiver.recv().await.unwrap();
269 }
270 agent_sender.send(i).await.unwrap();
271
272 });
274 }
275 track_peers(&oracle, agents.keys().cloned()).await;
276
277 let only_inbound = PrivateKey::from_seed(0).public_key();
279 for agent in agents.keys() {
280 if agent == &only_inbound {
281 continue;
283 }
284 for other in agents.keys() {
285 let result = oracle
286 .add_link(
287 agent.clone(),
288 other.clone(),
289 Link {
290 latency: Duration::from_millis(5),
291 jitter: Duration::from_millis(2),
292 success_rate: 0.75,
293 },
294 )
295 .await;
296 if agent == other {
297 assert!(matches!(result, Err(Error::LinkingSelf)));
298 } else {
299 assert!(result.is_ok());
300 }
301 }
302 }
303
304 context
305 .child("agent_sender")
306 .spawn(|mut context| async move {
307 let keys = agents.keys().cloned().collect::<Vec<_>>();
309
310 loop {
311 let index = context.gen_range(0..keys.len());
312 let sender = &keys[index];
313 let msg = format!("hello from {sender:?}");
314 let msg = IoBuf::copy_from_slice(msg.as_bytes());
315 let message_sender = agents.get_mut(sender).unwrap();
316 let sent = message_sender.send(Recipients::All, msg, false);
317 assert_eq!(sent.len(), keys.len() - 1);
318 reschedule().await;
319 }
320 });
321
322 let mut results = Vec::new();
324 for _ in 0..size {
325 results.push(seen_receiver.recv().await.unwrap());
326 }
327 (context.auditor().state(), results)
328 })
329 }
330
331 fn compare_outputs(seeds: u64, size: usize) {
332 let mut outputs = Vec::new();
334 for seed in 0..seeds {
335 outputs.push(simulate_messages(seed, size));
336 }
337
338 for seed in 0..seeds {
340 let output = simulate_messages(seed, size);
341 assert_eq!(output, outputs[seed as usize]);
342 }
343 }
344
345 #[test_group("slow")]
346 #[test]
347 fn test_determinism() {
348 compare_outputs(25, 25);
349 }
350
351 #[test]
352 #[should_panic(expected = "message too large")]
353 fn test_message_too_big() {
354 let executor = deterministic::Runner::default();
355 executor.start(|mut context| async move {
356 let (network, oracle) = Network::new(
358 context.child("network"),
359 Config {
360 max_size: 1024 * 1024,
361 disconnect_on_block: true,
362 tracked_peer_sets: NZUsize!(1),
363 },
364 );
365
366 network.start();
368
369 let mut agents = HashMap::new();
371 for i in 0..10 {
372 let pk = PrivateKey::from_seed(i as u64).public_key();
373 let (sender, _) = oracle
374 .control(pk.clone())
375 .register(0, TEST_QUOTA)
376 .await
377 .unwrap();
378 agents.insert(pk, sender);
379 }
380
381 let keys = agents.keys().collect::<Vec<_>>();
383 let index = context.gen_range(0..keys.len());
384 let sender = keys[index];
385 let mut message_sender = agents.get(sender).unwrap().clone();
386 let mut msg = vec![0u8; 1024 * 1024 + 1];
387 context.fill(&mut msg[..]);
388 message_sender.send(Recipients::All, msg, false);
389 });
390 }
391
392 #[test]
393 fn test_linking_self() {
394 let executor = deterministic::Runner::default();
395 executor.start(|context| async move {
396 let (network, oracle) = Network::new(
398 context.child("network"),
399 Config {
400 max_size: 1024 * 1024,
401 disconnect_on_block: true,
402 tracked_peer_sets: NZUsize!(1),
403 },
404 );
405
406 network.start();
408
409 let pk = PrivateKey::from_seed(0).public_key();
411 oracle
412 .control(pk.clone())
413 .register(0, TEST_QUOTA)
414 .await
415 .unwrap();
416
417 let result = oracle
419 .add_link(
420 pk.clone(),
421 pk,
422 Link {
423 latency: Duration::from_millis(5),
424 jitter: Duration::from_millis(2),
425 success_rate: 0.75,
426 },
427 )
428 .await;
429
430 assert!(matches!(result, Err(Error::LinkingSelf)));
432 });
433 }
434
435 #[test]
436 fn test_duplicate_channel() {
437 let executor = deterministic::Runner::default();
438 executor.start(|context| async move {
439 let (network, oracle) = Network::new(
441 context.child("network"),
442 Config {
443 max_size: 1024 * 1024,
444 disconnect_on_block: true,
445 tracked_peer_sets: NZUsize!(1),
446 },
447 );
448
449 network.start();
451
452 let my_pk = PrivateKey::from_seed(0).public_key();
454 let other_pk = PrivateKey::from_seed(1).public_key();
455 oracle
456 .add_link(
457 my_pk.clone(),
458 other_pk.clone(),
459 Link {
460 latency: Duration::from_millis(10),
461 jitter: Duration::from_millis(1),
462 success_rate: 1.0,
463 },
464 )
465 .await
466 .unwrap();
467 oracle
468 .add_link(
469 other_pk.clone(),
470 my_pk.clone(),
471 Link {
472 latency: Duration::from_millis(10),
473 jitter: Duration::from_millis(1),
474 success_rate: 1.0,
475 },
476 )
477 .await
478 .unwrap();
479
480 let (mut my_sender, mut my_receiver) = oracle
482 .control(my_pk.clone())
483 .register(0, TEST_QUOTA)
484 .await
485 .unwrap();
486 let (mut other_sender, mut other_receiver) = oracle
487 .control(other_pk.clone())
488 .register(0, TEST_QUOTA)
489 .await
490 .unwrap();
491 track_peers(&oracle, [my_pk.clone(), other_pk.clone()]).await;
492
493 let msg = IoBuf::from(b"hello");
495 let sent = my_sender.send(Recipients::One(other_pk.clone()), msg.clone(), false);
496 assert_eq!(sent.len(), 1);
497 let (from, message) = other_receiver.recv().await.unwrap();
498 assert_eq!(from, my_pk);
499 assert_eq!(message, msg.clone());
500 let sent = other_sender.send(Recipients::One(my_pk.clone()), msg.clone(), false);
501 assert_eq!(sent.len(), 1);
502 let (from, message) = my_receiver.recv().await.unwrap();
503 assert_eq!(from, other_pk);
504 assert_eq!(message, msg);
505
506 let (mut my_sender_2, mut my_receiver_2) = oracle
508 .control(my_pk.clone())
509 .register(0, TEST_QUOTA)
510 .await
511 .unwrap();
512
513 let msg = IoBuf::from(b"hello again");
515 let sent = my_sender_2.send(Recipients::One(other_pk.clone()), msg.clone(), false);
516 assert_eq!(sent.len(), 1);
517 let (from, message) = other_receiver.recv().await.unwrap();
518 assert_eq!(from, my_pk);
519 assert_eq!(message, msg.clone());
520 let sent = other_sender.send(Recipients::One(my_pk.clone()), msg.clone(), false);
521 assert_eq!(sent.len(), 1);
522 let (from, message) = my_receiver_2.recv().await.unwrap();
523 assert_eq!(from, other_pk);
524 assert_eq!(message, msg.clone());
525
526 assert!(matches!(
528 my_receiver.recv().await,
529 Err(Error::NetworkClosed)
530 ));
531
532 my_sender.send(Recipients::One(other_pk.clone()), msg, false);
534 });
535 }
536
537 #[test]
538 fn test_invalid_success_rate() {
539 let executor = deterministic::Runner::default();
540 executor.start(|context| async move {
541 let (network, oracle) = Network::new(
543 context.child("network"),
544 Config {
545 max_size: 1024 * 1024,
546 disconnect_on_block: true,
547 tracked_peer_sets: NZUsize!(1),
548 },
549 );
550
551 network.start();
553
554 let pk1 = PrivateKey::from_seed(0).public_key();
556 let pk2 = PrivateKey::from_seed(1).public_key();
557 oracle
558 .control(pk1.clone())
559 .register(0, TEST_QUOTA)
560 .await
561 .unwrap();
562 oracle
563 .control(pk2.clone())
564 .register(0, TEST_QUOTA)
565 .await
566 .unwrap();
567
568 let result = oracle
570 .add_link(
571 pk1,
572 pk2,
573 Link {
574 latency: Duration::from_millis(5),
575 jitter: Duration::from_millis(2),
576 success_rate: 1.5,
577 },
578 )
579 .await;
580
581 assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
583 });
584 }
585
586 #[test]
587 fn test_add_link_before_channel_registration() {
588 let executor = deterministic::Runner::default();
589 executor.start(|context| async move {
590 let pk1 = PrivateKey::from_seed(0).public_key();
592 let pk2 = PrivateKey::from_seed(1).public_key();
593
594 let (network, oracle) = Network::new_with_peers(
596 context.child("network"),
597 Config {
598 max_size: 1024 * 1024,
599 disconnect_on_block: true,
600 tracked_peer_sets: NZUsize!(3),
601 },
602 [pk1.clone(), pk2.clone()],
603 )
604 .await;
605 network.start();
606
607 oracle
609 .add_link(
610 pk1.clone(),
611 pk2.clone(),
612 Link {
613 latency: Duration::ZERO,
614 jitter: Duration::ZERO,
615 success_rate: 1.0,
616 },
617 )
618 .await
619 .unwrap();
620
621 let (mut sender1, _receiver1) = oracle
623 .control(pk1.clone())
624 .register(0, TEST_QUOTA)
625 .await
626 .unwrap();
627 let (_, mut receiver2) = oracle
628 .control(pk2.clone())
629 .register(0, TEST_QUOTA)
630 .await
631 .unwrap();
632
633 let msg1 = IoBuf::from(b"link-before-register-1");
635 sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
636 let (from, received) = receiver2.recv().await.unwrap();
637 assert_eq!(from, pk1);
638 assert_eq!(received, msg1);
639 });
640 }
641
642 #[test]
643 fn test_simple_message_delivery() {
644 let executor = deterministic::Runner::default();
645 executor.start(|context| async move {
646 let (network, oracle) = Network::new(
648 context.child("network"),
649 Config {
650 max_size: 1024 * 1024,
651 disconnect_on_block: true,
652 tracked_peer_sets: NZUsize!(1),
653 },
654 );
655
656 network.start();
658
659 let pk1 = PrivateKey::from_seed(0).public_key();
661 let pk2 = PrivateKey::from_seed(1).public_key();
662 let (mut sender1, mut receiver1) = oracle
663 .control(pk1.clone())
664 .register(0, TEST_QUOTA)
665 .await
666 .unwrap();
667 let (mut sender2, mut receiver2) = oracle
668 .control(pk2.clone())
669 .register(0, TEST_QUOTA)
670 .await
671 .unwrap();
672
673 let _ = oracle
675 .control(pk1.clone())
676 .register(1, TEST_QUOTA)
677 .await
678 .unwrap();
679 let _ = oracle
680 .control(pk2.clone())
681 .register(2, TEST_QUOTA)
682 .await
683 .unwrap();
684 track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
685
686 oracle
688 .add_link(
689 pk1.clone(),
690 pk2.clone(),
691 Link {
692 latency: Duration::from_millis(5),
693 jitter: Duration::from_millis(2),
694 success_rate: 1.0,
695 },
696 )
697 .await
698 .unwrap();
699 oracle
700 .add_link(
701 pk2.clone(),
702 pk1.clone(),
703 Link {
704 latency: Duration::from_millis(5),
705 jitter: Duration::from_millis(2),
706 success_rate: 1.0,
707 },
708 )
709 .await
710 .unwrap();
711
712 let msg1 = IoBuf::from(b"hello from pk1");
714 let msg2 = IoBuf::from(b"hello from pk2");
715 sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
716 sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
717
718 let (sender, message) = receiver1.recv().await.unwrap();
720 assert_eq!(sender, pk2);
721 assert_eq!(message, msg2);
722 let (sender, message) = receiver2.recv().await.unwrap();
723 assert_eq!(sender, pk1);
724 assert_eq!(message, msg1);
725 });
726 }
727
728 #[test]
729 fn test_send_wrong_channel() {
730 let executor = deterministic::Runner::default();
731 executor.start(|context| async move {
732 let (network, oracle) = Network::new(
734 context.child("network"),
735 Config {
736 max_size: 1024 * 1024,
737 disconnect_on_block: true,
738 tracked_peer_sets: NZUsize!(1),
739 },
740 );
741
742 network.start();
744
745 let pk1 = PrivateKey::from_seed(0).public_key();
747 let pk2 = PrivateKey::from_seed(1).public_key();
748 let (mut sender1, _) = oracle
749 .control(pk1.clone())
750 .register(0, TEST_QUOTA)
751 .await
752 .unwrap();
753 let (_, mut receiver2) = oracle
754 .control(pk2.clone())
755 .register(1, TEST_QUOTA)
756 .await
757 .unwrap();
758 track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
759
760 oracle
762 .add_link(
763 pk1,
764 pk2.clone(),
765 Link {
766 latency: Duration::from_millis(5),
767 jitter: Duration::ZERO,
768 success_rate: 1.0,
769 },
770 )
771 .await
772 .unwrap();
773
774 let msg = IoBuf::from(b"hello from pk1");
776 sender1.send(Recipients::One(pk2), msg, false);
777
778 select! {
780 _ = receiver2.recv() => {
781 panic!("unexpected message");
782 },
783 _ = context.sleep(Duration::from_secs(1)) => {},
784 }
785 });
786 }
787
788 #[test]
789 fn test_dynamic_peers() {
790 let executor = deterministic::Runner::default();
791 executor.start(|context| async move {
792 let (network, oracle) = Network::new(
794 context.child("network"),
795 Config {
796 max_size: 1024 * 1024,
797 disconnect_on_block: true,
798 tracked_peer_sets: NZUsize!(1),
799 },
800 );
801
802 network.start();
804
805 let pk1 = PrivateKey::from_seed(0).public_key();
807 let pk2 = PrivateKey::from_seed(1).public_key();
808 let (mut sender1, mut receiver1) = oracle
809 .control(pk1.clone())
810 .register(0, TEST_QUOTA)
811 .await
812 .unwrap();
813 let (mut sender2, mut receiver2) = oracle
814 .control(pk2.clone())
815 .register(0, TEST_QUOTA)
816 .await
817 .unwrap();
818 track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
819
820 oracle
822 .add_link(
823 pk1.clone(),
824 pk2.clone(),
825 Link {
826 latency: Duration::from_millis(5),
827 jitter: Duration::from_millis(2),
828 success_rate: 1.0,
829 },
830 )
831 .await
832 .unwrap();
833 oracle
834 .add_link(
835 pk2.clone(),
836 pk1.clone(),
837 Link {
838 latency: Duration::from_millis(5),
839 jitter: Duration::from_millis(2),
840 success_rate: 1.0,
841 },
842 )
843 .await
844 .unwrap();
845
846 let msg1 = IoBuf::from(b"attempt 1: hello from pk1");
848 let msg2 = IoBuf::from(b"attempt 1: hello from pk2");
849 sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
850 sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
851
852 let (sender, message) = receiver1.recv().await.unwrap();
854 assert_eq!(sender, pk2);
855 assert_eq!(message, msg2);
856 let (sender, message) = receiver2.recv().await.unwrap();
857 assert_eq!(sender, pk1);
858 assert_eq!(message, msg1);
859 });
860 }
861
862 #[test]
863 fn test_dynamic_links() {
864 let executor = deterministic::Runner::default();
865 executor.start(|context| async move {
866 let (network, oracle) = Network::new(
868 context.child("network"),
869 Config {
870 max_size: 1024 * 1024,
871 disconnect_on_block: true,
872 tracked_peer_sets: NZUsize!(1),
873 },
874 );
875
876 network.start();
878
879 let pk1 = PrivateKey::from_seed(0).public_key();
881 let pk2 = PrivateKey::from_seed(1).public_key();
882 let (mut sender1, mut receiver1) = oracle
883 .control(pk1.clone())
884 .register(0, TEST_QUOTA)
885 .await
886 .unwrap();
887 let (mut sender2, mut receiver2) = oracle
888 .control(pk2.clone())
889 .register(0, TEST_QUOTA)
890 .await
891 .unwrap();
892 track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
893
894 let msg1 = IoBuf::from(b"attempt 1: hello from pk1");
896 let msg2 = IoBuf::from(b"attempt 1: hello from pk2");
897 sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
898 sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
899
900 select! {
902 _ = receiver1.recv() => {
903 panic!("unexpected message");
904 },
905 _ = receiver2.recv() => {
906 panic!("unexpected message");
907 },
908 _ = context.sleep(Duration::from_secs(1)) => {},
909 }
910
911 oracle
913 .add_link(
914 pk1.clone(),
915 pk2.clone(),
916 Link {
917 latency: Duration::from_millis(5),
918 jitter: Duration::from_millis(2),
919 success_rate: 1.0,
920 },
921 )
922 .await
923 .unwrap();
924 oracle
925 .add_link(
926 pk2.clone(),
927 pk1.clone(),
928 Link {
929 latency: Duration::from_millis(5),
930 jitter: Duration::from_millis(2),
931 success_rate: 1.0,
932 },
933 )
934 .await
935 .unwrap();
936
937 let msg1 = IoBuf::from(b"attempt 2: hello from pk1");
939 let msg2 = IoBuf::from(b"attempt 2: hello from pk2");
940 sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
941 sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
942
943 let (sender, message) = receiver1.recv().await.unwrap();
945 assert_eq!(sender, pk2);
946 assert_eq!(message, msg2);
947 let (sender, message) = receiver2.recv().await.unwrap();
948 assert_eq!(sender, pk1);
949 assert_eq!(message, msg1);
950
951 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
953 oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
954
955 let msg1 = IoBuf::from(b"attempt 3: hello from pk1");
957 let msg2 = IoBuf::from(b"attempt 3: hello from pk2");
958 sender1.send(Recipients::One(pk2.clone()), msg1.clone(), false);
959 sender2.send(Recipients::One(pk1.clone()), msg2.clone(), false);
960
961 select! {
963 _ = receiver1.recv() => {
964 panic!("unexpected message");
965 },
966 _ = receiver2.recv() => {
967 panic!("unexpected message");
968 },
969 _ = context.sleep(Duration::from_secs(1)) => {},
970 }
971
972 let result = oracle.remove_link(pk1, pk2).await;
974 assert!(matches!(result, Err(Error::LinkMissing)));
975 });
976 }
977
978 async fn test_bandwidth_between_peers(
979 context: &mut deterministic::Context,
980 oracle: &Oracle<PublicKey, deterministic::Context>,
981 index: u64,
982 sender_bps: Option<usize>,
983 receiver_bps: Option<usize>,
984 message_size: usize,
985 expected_duration_ms: u64,
986 ) {
987 let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
989 let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
990 let (mut sender, _) = oracle
991 .control(pk1.clone())
992 .register(0, TEST_QUOTA)
993 .await
994 .unwrap();
995 let (_, mut receiver) = oracle
996 .control(pk2.clone())
997 .register(0, TEST_QUOTA)
998 .await
999 .unwrap();
1000 let mut manager = oracle.manager();
1001 manager.track(index, Set::from_iter_dedup([pk1.clone(), pk2.clone()]));
1002
1003 oracle
1005 .limit_bandwidth(pk1.clone(), sender_bps, None)
1006 .await
1007 .unwrap();
1008 oracle
1009 .limit_bandwidth(pk2.clone(), None, receiver_bps)
1010 .await
1011 .unwrap();
1012
1013 oracle
1015 .add_link(
1016 pk1.clone(),
1017 pk2.clone(),
1018 Link {
1019 latency: Duration::ZERO,
1021 jitter: Duration::ZERO,
1022 success_rate: 1.0,
1023 },
1024 )
1025 .await
1026 .unwrap();
1027
1028 let msg = IoBuf::from(vec![42u8; message_size]);
1030 let start = context.current();
1031 sender.send(Recipients::One(pk2.clone()), msg.clone(), true);
1032
1033 let (origin, received) = receiver.recv().await.unwrap();
1035 let elapsed = context.current().duration_since(start).unwrap();
1036
1037 assert_eq!(origin, pk1);
1038 assert_eq!(received, msg);
1039 assert!(
1040 elapsed >= Duration::from_millis(expected_duration_ms),
1041 "Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
1042 );
1043 assert!(
1044 elapsed < Duration::from_millis(expected_duration_ms + 100),
1045 "Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
1046 );
1047 }
1048
1049 #[test]
1050 fn test_bandwidth() {
1051 let executor = deterministic::Runner::default();
1052 executor.start(|mut context| async move {
1053 let (network, oracle) = Network::new(
1054 context.child("network"),
1055 Config {
1056 max_size: 1024 * 1024,
1057 disconnect_on_block: true,
1058 tracked_peer_sets: NZUsize!(1),
1059 },
1060 );
1061 network.start();
1062
1063 test_bandwidth_between_peers(
1066 &mut context,
1067 &oracle,
1068 0,
1069 Some(1000), Some(1000), 500, 500, )
1074 .await;
1075
1076 test_bandwidth_between_peers(
1080 &mut context,
1081 &oracle,
1082 1,
1083 Some(500), Some(2000), 250, 500, )
1088 .await;
1089
1090 test_bandwidth_between_peers(
1094 &mut context,
1095 &oracle,
1096 2,
1097 Some(2000), Some(500), 250, 500, )
1102 .await;
1103
1104 test_bandwidth_between_peers(
1108 &mut context,
1109 &oracle,
1110 3,
1111 None, Some(1000), 500, 500, )
1116 .await;
1117
1118 test_bandwidth_between_peers(
1122 &mut context,
1123 &oracle,
1124 4,
1125 Some(1000), None, 500, 500, )
1130 .await;
1131
1132 test_bandwidth_between_peers(
1135 &mut context,
1136 &oracle,
1137 5,
1138 None, None, 500, 0, )
1143 .await;
1144 });
1145 }
1146
1147 #[test]
1148 fn test_bandwidth_contention() {
1149 let executor = deterministic::Runner::default();
1151 executor.start(|context| async move {
1152 let (network, oracle) = Network::new(
1153 context.child("network"),
1154 Config {
1155 max_size: 1024 * 1024,
1156 disconnect_on_block: true,
1157 tracked_peer_sets: NZUsize!(1),
1158 },
1159 );
1160 network.start();
1161
1162 const NUM_PEERS: usize = 100;
1164 const MESSAGE_SIZE: usize = 1000; const EFFECTIVE_BPS: usize = 10_000; let mut peers = Vec::with_capacity(NUM_PEERS + 1);
1169 let mut senders = Vec::with_capacity(NUM_PEERS + 1);
1170 let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
1171
1172 for i in 0..=NUM_PEERS {
1174 let pk = PrivateKey::from_seed(i as u64).public_key();
1175 let (sender, receiver) = oracle
1176 .control(pk.clone())
1177 .register(0, TEST_QUOTA)
1178 .await
1179 .unwrap();
1180 peers.push(pk);
1181 senders.push(sender);
1182 receivers.push(receiver);
1183 }
1184 track_peers(&oracle, peers.iter().cloned()).await;
1185
1186 for pk in &peers {
1188 oracle
1189 .limit_bandwidth(pk.clone(), Some(EFFECTIVE_BPS), Some(EFFECTIVE_BPS))
1190 .await
1191 .unwrap();
1192 }
1193
1194 for peer in peers.iter().skip(1) {
1196 oracle
1197 .add_link(
1198 peer.clone(),
1199 peers[0].clone(),
1200 Link {
1201 latency: Duration::ZERO,
1202 jitter: Duration::ZERO,
1203 success_rate: 1.0,
1204 },
1205 )
1206 .await
1207 .unwrap();
1208 oracle
1209 .add_link(
1210 peers[0].clone(),
1211 peer.clone(),
1212 Link {
1213 latency: Duration::ZERO,
1214 jitter: Duration::ZERO,
1215 success_rate: 1.0,
1216 },
1217 )
1218 .await
1219 .unwrap();
1220 }
1221
1222 let start = context.current();
1225
1226 let msg = IoBuf::from(vec![0u8; MESSAGE_SIZE]);
1229 for peer in peers.iter().skip(1) {
1230 senders[0].send(Recipients::One(peer.clone()), msg.clone(), true);
1231 }
1232
1233 for receiver in receivers.iter_mut().skip(1) {
1235 let (origin, received) = receiver.recv().await.unwrap();
1236 assert_eq!(origin, peers[0]);
1237 assert_eq!(received.len(), MESSAGE_SIZE);
1238 }
1239
1240 let elapsed = context.current().duration_since(start).unwrap();
1241
1242 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1244
1245 assert!(
1246 elapsed >= Duration::from_millis(expected_ms as u64),
1247 "One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1248 );
1249 assert!(
1250 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1251 "One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1252 );
1253
1254 let start = context.current();
1256
1257 let msg = IoBuf::from(vec![0; MESSAGE_SIZE]);
1260 for mut sender in senders.into_iter().skip(1) {
1261 sender.send(Recipients::One(peers[0].clone()), msg.clone(), true);
1262 }
1263
1264 let mut received_from = HashSet::new();
1266 for _ in 1..=NUM_PEERS {
1267 let (origin, received) = receivers[0].recv().await.unwrap();
1268 assert_eq!(received.len(), MESSAGE_SIZE);
1269 assert!(
1270 received_from.insert(origin.clone()),
1271 "Received duplicate from {origin:?}"
1272 );
1273 }
1274
1275 let elapsed = context.current().duration_since(start).unwrap();
1276
1277 let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
1279
1280 assert!(
1281 elapsed >= Duration::from_millis(expected_ms as u64),
1282 "Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
1283 );
1284 assert!(
1285 elapsed < Duration::from_millis((expected_ms as u64) + 500),
1286 "Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
1287 );
1288
1289 assert_eq!(received_from.len(), NUM_PEERS);
1291 for peer in peers.iter().skip(1) {
1292 assert!(received_from.contains(peer));
1293 }
1294 });
1295 }
1296
1297 #[test]
1298 fn test_message_ordering() {
1299 let executor = deterministic::Runner::default();
1301 executor.start(|context| async move {
1302 let (network, oracle) = Network::new(
1303 context.child("network"),
1304 Config {
1305 max_size: 1024 * 1024,
1306 disconnect_on_block: true,
1307 tracked_peer_sets: NZUsize!(1),
1308 },
1309 );
1310 network.start();
1311
1312 let pk1 = PrivateKey::from_seed(1).public_key();
1314 let pk2 = PrivateKey::from_seed(2).public_key();
1315 let (mut sender, _) = oracle
1316 .control(pk1.clone())
1317 .register(0, TEST_QUOTA)
1318 .await
1319 .unwrap();
1320 let (_, mut receiver) = oracle
1321 .control(pk2.clone())
1322 .register(0, TEST_QUOTA)
1323 .await
1324 .unwrap();
1325 track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
1326
1327 oracle
1329 .add_link(
1330 pk1.clone(),
1331 pk2.clone(),
1332 Link {
1333 latency: Duration::from_millis(50),
1334 jitter: Duration::from_millis(40),
1335 success_rate: 1.0,
1336 },
1337 )
1338 .await
1339 .unwrap();
1340
1341 let messages = vec![
1343 IoBuf::from(b"message 1"),
1344 IoBuf::from(b"message 2"),
1345 IoBuf::from(b"message 3"),
1346 IoBuf::from(b"message 4"),
1347 IoBuf::from(b"message 5"),
1348 ];
1349
1350 for msg in messages.clone() {
1351 sender.send(Recipients::One(pk2.clone()), msg, true);
1352 }
1353
1354 for expected_msg in messages {
1356 let (origin, received_msg) = receiver.recv().await.unwrap();
1357 assert_eq!(origin, pk1);
1358 assert_eq!(received_msg, expected_msg);
1359 }
1360 })
1361 }
1362
1363 #[test]
1364 fn test_high_latency_message_blocks_followup() {
1365 let executor = deterministic::Runner::default();
1366 executor.start(|context| async move {
1367 let (network, oracle) = Network::new(
1368 context.child("network"),
1369 Config {
1370 max_size: 1024 * 1024,
1371 disconnect_on_block: true,
1372 tracked_peer_sets: NZUsize!(1),
1373 },
1374 );
1375 network.start();
1376
1377 let pk1 = PrivateKey::from_seed(1).public_key();
1378 let pk2 = PrivateKey::from_seed(2).public_key();
1379 let (mut sender, _) = oracle.control(pk1.clone()).register(0, TEST_QUOTA).await.unwrap();
1380 let (_, mut receiver) = oracle.control(pk2.clone()).register(0, TEST_QUOTA).await.unwrap();
1381 track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
1382
1383 const BPS: usize = 1_000;
1384 oracle
1385 .limit_bandwidth(pk1.clone(), Some(BPS), None)
1386 .await
1387 .unwrap();
1388 oracle
1389 .limit_bandwidth(pk2.clone(), None, Some(BPS))
1390 .await
1391 .unwrap();
1392
1393 oracle
1395 .add_link(
1396 pk1.clone(),
1397 pk2.clone(),
1398 Link {
1399 latency: Duration::from_millis(5_000),
1400 jitter: Duration::ZERO,
1401 success_rate: 1.0,
1402 },
1403 )
1404 .await
1405 .unwrap();
1406
1407 let egress_time = Duration::from_secs(1);
1408 let start = context.current();
1409 let slow = IoBuf::from(vec![0u8; 1_000]);
1410 sender
1411 .send(Recipients::One(pk2.clone()), slow.clone(), true);
1412
1413 oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
1415 oracle
1416 .add_link(
1417 pk1.clone(),
1418 pk2.clone(),
1419 Link {
1420 latency: Duration::from_millis(1),
1421 jitter: Duration::ZERO,
1422 success_rate: 1.0,
1423 },
1424 )
1425 .await
1426 .unwrap();
1427
1428 let fast = IoBuf::from(vec![1u8; 1_000]);
1430 sender
1431 .send(Recipients::One(pk2.clone()), fast.clone(), true);
1432
1433 let (origin1, message1) = receiver.recv().await.unwrap();
1434 assert_eq!(origin1, pk1);
1435 assert_eq!(message1, slow);
1436 let first_elapsed = context.current().duration_since(start).unwrap();
1437
1438 let (origin2, message2) = receiver.recv().await.unwrap();
1439 let second_elapsed = context.current().duration_since(start).unwrap();
1440 assert_eq!(origin2, pk1);
1441 assert_eq!(message2, fast);
1442
1443 let slow_latency = Duration::from_millis(5_000);
1444 let expected_first = egress_time + slow_latency;
1445 let tolerance = Duration::from_millis(10);
1446 assert!(
1447 first_elapsed >= expected_first.saturating_sub(tolerance)
1448 && first_elapsed <= expected_first + tolerance,
1449 "slow message arrived outside expected window: {first_elapsed:?} (expected {expected_first:?} ± {tolerance:?})"
1450 );
1451 assert!(
1452 second_elapsed >= first_elapsed,
1453 "fast message arrived before slow transmission completed"
1454 );
1455
1456 let arrival_gap = second_elapsed
1457 .checked_sub(first_elapsed)
1458 .expect("timestamps ordered");
1459 assert!(
1460 arrival_gap >= egress_time.saturating_sub(tolerance)
1461 && arrival_gap <= egress_time + tolerance,
1462 "next arrival deviated from transmit duration (gap = {arrival_gap:?}, expected {egress_time:?} ± {tolerance:?})"
1463 );
1464 })
1465 }
1466
1467 #[test]
1468 fn test_many_to_one_bandwidth_sharing() {
1469 let executor = deterministic::Runner::default();
1470 executor.start(|context| async move {
1471 let (network, oracle) = Network::new(
1472 context.child("network"),
1473 Config {
1474 max_size: 1024 * 1024,
1475 disconnect_on_block: true,
1476 tracked_peer_sets: NZUsize!(1),
1477 },
1478 );
1479 network.start();
1480
1481 let mut senders = Vec::new();
1483 let mut sender_txs = Vec::new();
1484 for i in 0..10 {
1485 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1486 senders.push(sender.clone());
1487 let (tx, _) = oracle
1488 .control(sender.clone())
1489 .register(0, TEST_QUOTA)
1490 .await
1491 .unwrap();
1492 sender_txs.push(tx);
1493
1494 oracle
1496 .limit_bandwidth(sender.clone(), Some(10_000), None)
1497 .await
1498 .unwrap();
1499 }
1500
1501 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1502 let (_, mut receiver_rx) = oracle
1503 .control(receiver.clone())
1504 .register(0, TEST_QUOTA)
1505 .await
1506 .unwrap();
1507 track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1508
1509 oracle
1511 .limit_bandwidth(receiver.clone(), None, Some(100_000))
1512 .await
1513 .unwrap();
1514
1515 for sender in &senders {
1517 oracle
1518 .add_link(
1519 sender.clone(),
1520 receiver.clone(),
1521 Link {
1522 latency: Duration::ZERO,
1523 jitter: Duration::ZERO,
1524 success_rate: 1.0,
1525 },
1526 )
1527 .await
1528 .unwrap();
1529 }
1530
1531 let start = context.current();
1532
1533 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1535 let receiver_clone = receiver.clone();
1536 let msg = IoBuf::from(vec![i as u8; 10_000]);
1537 tx.send(Recipients::One(receiver_clone), msg, true);
1538 }
1539
1540 for i in 0..10 {
1543 let (_, _msg) = receiver_rx.recv().await.unwrap();
1544 let recv_time = context.current().duration_since(start).unwrap();
1545
1546 assert!(
1548 recv_time >= Duration::from_millis(950)
1549 && recv_time <= Duration::from_millis(1100),
1550 "Message {i} received at {recv_time:?}, expected ~1s",
1551 );
1552 }
1553 });
1554 }
1555
1556 #[test]
1557 fn test_one_to_many_fast_sender() {
1558 let executor = deterministic::Runner::default();
1561 executor.start(|context| async move {
1562 let (network, oracle) = Network::new(
1563 context.child("network"),
1564 Config {
1565 max_size: 1024 * 1024,
1566 disconnect_on_block: true,
1567 tracked_peer_sets: NZUsize!(1),
1568 },
1569 );
1570 network.start();
1571
1572 let sender = ed25519::PrivateKey::from_seed(0).public_key();
1574 let (mut sender_tx, _) = oracle
1575 .control(sender.clone())
1576 .register(0, TEST_QUOTA)
1577 .await
1578 .unwrap();
1579
1580 oracle
1582 .limit_bandwidth(sender.clone(), Some(100_000), None)
1583 .await
1584 .unwrap();
1585
1586 let mut receivers = Vec::new();
1588 let mut receiver_rxs = Vec::new();
1589 for i in 0..10 {
1590 let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
1591 receivers.push(receiver.clone());
1592 let (_, rx) = oracle
1593 .control(receiver.clone())
1594 .register(0, TEST_QUOTA)
1595 .await
1596 .unwrap();
1597 receiver_rxs.push(rx);
1598
1599 oracle
1601 .limit_bandwidth(receiver.clone(), None, Some(10_000))
1602 .await
1603 .unwrap();
1604
1605 oracle
1607 .add_link(
1608 sender.clone(),
1609 receiver.clone(),
1610 Link {
1611 latency: Duration::ZERO,
1612 jitter: Duration::ZERO,
1613 success_rate: 1.0,
1614 },
1615 )
1616 .await
1617 .unwrap();
1618 }
1619 track_peers(
1620 &oracle,
1621 core::iter::once(sender.clone()).chain(receivers.iter().cloned()),
1622 )
1623 .await;
1624
1625 let start = context.current();
1626
1627 for (i, receiver) in receivers.iter().enumerate() {
1629 let msg = IoBuf::from(vec![i as u8; 10_000]);
1630 sender_tx.send(Recipients::One(receiver.clone()), msg, true);
1631 }
1632
1633 for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
1635 let (_, msg) = rx.recv().await.unwrap();
1636 assert_eq!(msg.as_ref()[0], i as u8);
1637 let recv_time = context.current().duration_since(start).unwrap();
1638
1639 assert!(
1641 recv_time >= Duration::from_millis(950)
1642 && recv_time <= Duration::from_millis(1100),
1643 "Receiver {i} received at {recv_time:?}, expected ~1s",
1644 );
1645 }
1646 });
1647 }
1648
1649 #[test]
1650 fn test_many_slow_senders_to_fast_receiver() {
1651 let executor = deterministic::Runner::default();
1654 executor.start(|context| async move {
1655 let (network, oracle) = Network::new(
1656 context.child("network"),
1657 Config {
1658 max_size: 1024 * 1024,
1659 disconnect_on_block: true,
1660 tracked_peer_sets: NZUsize!(1),
1661 },
1662 );
1663 network.start();
1664
1665 let mut senders = Vec::new();
1667 let mut sender_txs = Vec::new();
1668 for i in 0..10 {
1669 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1670 senders.push(sender.clone());
1671 let (tx, _) = oracle
1672 .control(sender.clone())
1673 .register(0, TEST_QUOTA)
1674 .await
1675 .unwrap();
1676 sender_txs.push(tx);
1677
1678 oracle
1680 .limit_bandwidth(sender.clone(), Some(1_000), None)
1681 .await
1682 .unwrap();
1683 }
1684
1685 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1687 let (_, mut receiver_rx) = oracle
1688 .control(receiver.clone())
1689 .register(0, TEST_QUOTA)
1690 .await
1691 .unwrap();
1692 track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1693
1694 oracle
1696 .limit_bandwidth(receiver.clone(), None, Some(10_000))
1697 .await
1698 .unwrap();
1699
1700 for sender in &senders {
1702 oracle
1703 .add_link(
1704 sender.clone(),
1705 receiver.clone(),
1706 Link {
1707 latency: Duration::ZERO,
1708 jitter: Duration::ZERO,
1709 success_rate: 1.0,
1710 },
1711 )
1712 .await
1713 .unwrap();
1714 }
1715
1716 let start = context.current();
1717
1718 for (i, mut tx) in sender_txs.into_iter().enumerate() {
1720 let receiver_clone = receiver.clone();
1721 let msg = IoBuf::from(vec![i as u8; 1_000]);
1722 tx.send(Recipients::One(receiver_clone), msg, true);
1723 }
1724
1725 for i in 0..10 {
1731 let (_, _msg) = receiver_rx.recv().await.unwrap();
1732 let recv_time = context.current().duration_since(start).unwrap();
1733
1734 assert!(
1736 recv_time >= Duration::from_millis(950)
1737 && recv_time <= Duration::from_millis(1100),
1738 "Message {i} received at {recv_time:?}, expected ~1s",
1739 );
1740 }
1741 });
1742 }
1743
1744 #[test]
1745 fn test_dynamic_bandwidth_allocation_staggered() {
1746 let executor = deterministic::Runner::default();
1752 executor.start(|context| async move {
1753 let (network, oracle) = Network::new(
1754 context.child("network"),
1755 Config {
1756 max_size: 1024 * 1024,
1757 disconnect_on_block: true,
1758 tracked_peer_sets: NZUsize!(1),
1759 },
1760 );
1761 network.start();
1762
1763 let mut senders = Vec::new();
1765 let mut sender_txs = Vec::new();
1766 for i in 0..3 {
1767 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1768 senders.push(sender.clone());
1769 let (tx, _) = oracle
1770 .control(sender.clone())
1771 .register(0, TEST_QUOTA)
1772 .await
1773 .unwrap();
1774 sender_txs.push(tx);
1775
1776 oracle
1778 .limit_bandwidth(sender.clone(), Some(30_000), None)
1779 .await
1780 .unwrap();
1781 }
1782
1783 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1785 let (_, mut receiver_rx) = oracle
1786 .control(receiver.clone())
1787 .register(0, TEST_QUOTA)
1788 .await
1789 .unwrap();
1790 track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1791 oracle
1792 .limit_bandwidth(receiver.clone(), None, Some(30_000))
1793 .await
1794 .unwrap();
1795
1796 for sender in &senders {
1798 oracle
1799 .add_link(
1800 sender.clone(),
1801 receiver.clone(),
1802 Link {
1803 latency: Duration::from_millis(1),
1804 jitter: Duration::ZERO,
1805 success_rate: 1.0,
1806 },
1807 )
1808 .await
1809 .unwrap();
1810 }
1811
1812 let start = context.current();
1813 let mut sender_txs = sender_txs.into_iter();
1814
1815 let mut tx0 = sender_txs.next().expect("missing sender 0");
1819 let rx_clone = receiver.clone();
1820 context.child("task").spawn(move |_| async move {
1821 let msg = IoBuf::from(vec![0u8; 30_000]);
1822 tx0.send(Recipients::One(rx_clone), msg, true);
1823 });
1824
1825 let mut tx1 = sender_txs.next().expect("missing sender 1");
1829 let rx_clone = receiver.clone();
1830 context.child("task").spawn(move |context| async move {
1831 context.sleep(Duration::from_millis(500)).await;
1832 let msg = IoBuf::from(vec![1u8; 30_000]);
1833 tx1.send(Recipients::One(rx_clone), msg, true);
1834 });
1835
1836 let mut tx2 = sender_txs.next().expect("missing sender 2");
1839 let rx_clone = receiver.clone();
1840 context.child("task").spawn(move |context| async move {
1841 context.sleep(Duration::from_millis(1500)).await;
1842 let msg = IoBuf::from(vec![2u8; 15_000]);
1843 tx2.send(Recipients::One(rx_clone), msg, true);
1844 });
1845
1846 let (_, msg0) = receiver_rx.recv().await.unwrap();
1850 assert_eq!(msg0.as_ref()[0], 0);
1851 let t0 = context.current().duration_since(start).unwrap();
1852 assert!(
1853 t0 >= Duration::from_millis(1490) && t0 <= Duration::from_millis(1600),
1854 "Message 0 received at {t0:?}, expected ~1.5s",
1855 );
1856
1857 let (_, msg_a) = receiver_rx.recv().await.unwrap();
1861 let t_a = context.current().duration_since(start).unwrap();
1862
1863 let (_, msg_b) = receiver_rx.recv().await.unwrap();
1864 let t_b = context.current().duration_since(start).unwrap();
1865
1866 let (msg1, t1, msg2, t2) = if msg_a.as_ref()[0] == 1 {
1868 (msg_a, t_a, msg_b, t_b)
1869 } else {
1870 (msg_b, t_b, msg_a, t_a)
1871 };
1872
1873 assert_eq!(msg1.as_ref()[0], 1);
1874 assert_eq!(msg2.as_ref()[0], 2);
1875
1876 assert!(
1881 t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
1882 "Message 1 received at {t1:?}, expected between 1.5s-2.6s",
1883 );
1884
1885 assert!(
1886 t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
1887 "Message 2 received at {t2:?}, expected between 1.5s-2.6s",
1888 );
1889 });
1890 }
1891
1892 #[test]
1893 fn test_dynamic_bandwidth_varied_sizes() {
1894 let executor = deterministic::Runner::default();
1897 executor.start(|context| async move {
1898 let (network, oracle) = Network::new(
1899 context.child("network"),
1900 Config {
1901 max_size: 1024 * 1024,
1902 disconnect_on_block: true,
1903 tracked_peer_sets: NZUsize!(1),
1904 },
1905 );
1906 network.start();
1907
1908 let mut senders = Vec::new();
1910 let mut sender_txs = Vec::new();
1911 for i in 0..3 {
1912 let sender = ed25519::PrivateKey::from_seed(i).public_key();
1913 senders.push(sender.clone());
1914 let (tx, _) = oracle
1915 .control(sender.clone())
1916 .register(0, TEST_QUOTA)
1917 .await
1918 .unwrap();
1919 sender_txs.push(tx);
1920
1921 oracle
1923 .limit_bandwidth(sender.clone(), None, None)
1924 .await
1925 .unwrap();
1926 }
1927
1928 let receiver = ed25519::PrivateKey::from_seed(100).public_key();
1930 let (_, mut receiver_rx) = oracle
1931 .control(receiver.clone())
1932 .register(0, TEST_QUOTA)
1933 .await
1934 .unwrap();
1935 track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
1936 oracle
1937 .limit_bandwidth(receiver.clone(), None, Some(30_000))
1938 .await
1939 .unwrap();
1940
1941 for sender in &senders {
1943 oracle
1944 .add_link(
1945 sender.clone(),
1946 receiver.clone(),
1947 Link {
1948 latency: Duration::from_millis(1),
1949 jitter: Duration::ZERO,
1950 success_rate: 1.0,
1951 },
1952 )
1953 .await
1954 .unwrap();
1955 }
1956
1957 let start = context.current();
1958
1959 let sizes = [10_000, 20_000, 30_000];
1965 for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
1966 let rx_clone = receiver.clone();
1967 let msg_size = *size;
1968 let msg = IoBuf::from(vec![i as u8; msg_size]);
1969 tx.send(Recipients::One(rx_clone), msg, true);
1970 }
1971
1972 let mut messages = Vec::new();
1976 for _ in 0..3 {
1977 let (_, msg) = receiver_rx.recv().await.unwrap();
1978 let t = context.current().duration_since(start).unwrap();
1979 messages.push((msg.as_ref()[0] as usize, msg.len(), t));
1980 }
1981
1982 assert_eq!(messages.len(), 3);
1987
1988 let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
1990 assert!(
1991 max_time >= Duration::from_millis(2000),
1992 "Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
1993 );
1994 });
1995 }
1996
1997 #[test]
1998 fn test_bandwidth_pipe_reservation_duration() {
1999 let executor = deterministic::Runner::default();
2002 executor.start(|context| async move {
2003 let (network, oracle) = Network::new(
2004 context.child("network"),
2005 Config {
2006 max_size: 1024 * 1024,
2007 disconnect_on_block: true,
2008 tracked_peer_sets: NZUsize!(1),
2009 },
2010 );
2011 network.start();
2012
2013 let sender = PrivateKey::from_seed(1).public_key();
2015 let receiver = PrivateKey::from_seed(2).public_key();
2016
2017 let (mut sender_tx, _) = oracle
2018 .control(sender.clone())
2019 .register(0, TEST_QUOTA)
2020 .await
2021 .unwrap();
2022 let (_, mut receiver_rx) = oracle
2023 .control(receiver.clone())
2024 .register(0, TEST_QUOTA)
2025 .await
2026 .unwrap();
2027 track_peers(&oracle, [sender.clone(), receiver.clone()]).await;
2028
2029 oracle
2031 .limit_bandwidth(sender.clone(), Some(1000), None)
2032 .await
2033 .unwrap();
2034 oracle
2035 .limit_bandwidth(receiver.clone(), None, Some(1000))
2036 .await
2037 .unwrap();
2038
2039 oracle
2041 .add_link(
2042 sender.clone(),
2043 receiver.clone(),
2044 Link {
2045 latency: Duration::from_secs(1), jitter: Duration::ZERO,
2047 success_rate: 1.0,
2048 },
2049 )
2050 .await
2051 .unwrap();
2052
2053 let start = context.current();
2064
2065 for i in 0..3 {
2067 let msg = IoBuf::from(vec![i; 500]);
2068 sender_tx.send(Recipients::One(receiver.clone()), msg, false);
2069 }
2070
2071 let mut receive_times = Vec::new();
2073 for i in 0..3 {
2074 let (_, received) = receiver_rx.recv().await.unwrap();
2075 receive_times.push(context.current().duration_since(start).unwrap());
2076 assert_eq!(received.as_ref()[0], i);
2077 }
2078
2079 for (i, time) in receive_times.iter().enumerate() {
2084 let expected_min = (i as u64 * 500) + 1500;
2085 let expected_max = expected_min + 100;
2086
2087 assert!(
2088 *time >= Duration::from_millis(expected_min)
2089 && *time < Duration::from_millis(expected_max),
2090 "Message {} should arrive at ~{}ms, got {:?}",
2091 i + 1,
2092 expected_min,
2093 time
2094 );
2095 }
2096 });
2097 }
2098
2099 #[test]
2100 fn test_dynamic_bandwidth_affects_new_transfers() {
2101 let executor = deterministic::Runner::default();
2104 executor.start(|context| async move {
2105 let (network, oracle) = Network::new(
2106 context.child("network"),
2107 Config {
2108 max_size: 1024 * 1024,
2109 disconnect_on_block: true,
2110 tracked_peer_sets: NZUsize!(1),
2111 },
2112 );
2113 network.start();
2114
2115 let pk_sender = PrivateKey::from_seed(1).public_key();
2116 let pk_receiver = PrivateKey::from_seed(2).public_key();
2117
2118 let (mut sender_tx, _) = oracle
2120 .control(pk_sender.clone())
2121 .register(0, TEST_QUOTA)
2122 .await
2123 .unwrap();
2124 let (_, mut receiver_rx) = oracle
2125 .control(pk_receiver.clone())
2126 .register(0, TEST_QUOTA)
2127 .await
2128 .unwrap();
2129 track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
2130 oracle
2131 .add_link(
2132 pk_sender.clone(),
2133 pk_receiver.clone(),
2134 Link {
2135 latency: Duration::from_millis(1), jitter: Duration::ZERO,
2137 success_rate: 1.0,
2138 },
2139 )
2140 .await
2141 .unwrap();
2142
2143 oracle
2145 .limit_bandwidth(pk_sender.clone(), Some(10_000), None)
2146 .await
2147 .unwrap();
2148 oracle
2149 .limit_bandwidth(pk_receiver.clone(), None, Some(10_000))
2150 .await
2151 .unwrap();
2152
2153 let msg1 = IoBuf::from(vec![1u8; 20_000]); let start_time = context.current();
2156 sender_tx.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false);
2157
2158 let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
2160 let msg1_time = context.current().duration_since(start_time).unwrap();
2161 assert_eq!(received_msg1.len(), 20_000);
2162 assert!(
2163 msg1_time >= Duration::from_millis(1999)
2164 && msg1_time <= Duration::from_millis(2010),
2165 "First message should take ~2s, got {msg1_time:?}",
2166 );
2167
2168 oracle
2170 .limit_bandwidth(pk_sender.clone(), Some(2_000), None)
2171 .await
2172 .unwrap();
2173
2174 let msg2 = IoBuf::from(vec![2u8; 10_000]); let msg2_start = context.current();
2177 sender_tx.send(Recipients::One(pk_receiver.clone()), msg2.clone(), false);
2178
2179 let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
2181 let msg2_time = context.current().duration_since(msg2_start).unwrap();
2182 assert_eq!(received_msg2.len(), 10_000);
2183 assert!(
2184 msg2_time >= Duration::from_millis(4999)
2185 && msg2_time <= Duration::from_millis(5010),
2186 "Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
2187 );
2188 });
2189 }
2190
2191 #[test]
2192 fn test_zero_receiver_ingress_bandwidth() {
2193 let executor = deterministic::Runner::default();
2194 executor.start(|context| async move {
2195 let (network, oracle) = Network::new(
2196 context.child("network"),
2197 Config {
2198 max_size: 1024 * 1024,
2199 disconnect_on_block: true,
2200 tracked_peer_sets: NZUsize!(1),
2201 },
2202 );
2203 network.start();
2204
2205 let pk_sender = PrivateKey::from_seed(1).public_key();
2206 let pk_receiver = PrivateKey::from_seed(2).public_key();
2207
2208 let (mut sender_tx, _) = oracle
2210 .control(pk_sender.clone())
2211 .register(0, TEST_QUOTA)
2212 .await
2213 .unwrap();
2214 let (_, mut receiver_rx) = oracle
2215 .control(pk_receiver.clone())
2216 .register(0, TEST_QUOTA)
2217 .await
2218 .unwrap();
2219 track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
2220 oracle
2221 .add_link(
2222 pk_sender.clone(),
2223 pk_receiver.clone(),
2224 Link {
2225 latency: Duration::ZERO,
2226 jitter: Duration::ZERO,
2227 success_rate: 1.0,
2228 },
2229 )
2230 .await
2231 .unwrap();
2232
2233 oracle
2235 .limit_bandwidth(pk_receiver.clone(), None, Some(0))
2236 .await
2237 .unwrap();
2238
2239 let msg1 = IoBuf::from(vec![1u8; 20_000]); let sent = sender_tx.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false);
2242 assert_eq!(sent.len(), 1);
2243 assert_eq!(sent[0], pk_receiver);
2244
2245 select! {
2247 _ = receiver_rx.recv() => {
2248 panic!("unexpected message");
2249 },
2250 _ = context.sleep(Duration::from_secs(10)) => {},
2251 }
2252
2253 oracle
2255 .limit_bandwidth(pk_receiver.clone(), None, None)
2256 .await
2257 .unwrap();
2258
2259 select! {
2261 _ = receiver_rx.recv() => {},
2262 _ = context.sleep(Duration::from_secs(1)) => {
2263 panic!("timeout");
2264 },
2265 }
2266 });
2267 }
2268
2269 #[test]
2270 fn test_zero_sender_egress_bandwidth() {
2271 let executor = deterministic::Runner::default();
2272 executor.start(|context| async move {
2273 let (network, oracle) = Network::new(
2274 context.child("network"),
2275 Config {
2276 max_size: 1024 * 1024,
2277 disconnect_on_block: true,
2278 tracked_peer_sets: NZUsize!(1),
2279 },
2280 );
2281 network.start();
2282
2283 let pk_sender = PrivateKey::from_seed(1).public_key();
2284 let pk_receiver = PrivateKey::from_seed(2).public_key();
2285
2286 let (mut sender_tx, _) = oracle
2288 .control(pk_sender.clone())
2289 .register(0, TEST_QUOTA)
2290 .await
2291 .unwrap();
2292 let (_, mut receiver_rx) = oracle
2293 .control(pk_receiver.clone())
2294 .register(0, TEST_QUOTA)
2295 .await
2296 .unwrap();
2297 track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
2298 oracle
2299 .add_link(
2300 pk_sender.clone(),
2301 pk_receiver.clone(),
2302 Link {
2303 latency: Duration::ZERO,
2304 jitter: Duration::ZERO,
2305 success_rate: 1.0,
2306 },
2307 )
2308 .await
2309 .unwrap();
2310
2311 oracle
2313 .limit_bandwidth(pk_sender.clone(), Some(0), None)
2314 .await
2315 .unwrap();
2316
2317 let msg1 = IoBuf::from(vec![1u8; 20_000]); let sent = sender_tx.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false);
2320 assert_eq!(sent.len(), 1);
2321 assert_eq!(sent[0], pk_receiver);
2322
2323 select! {
2325 _ = receiver_rx.recv() => {
2326 panic!("unexpected message");
2327 },
2328 _ = context.sleep(Duration::from_secs(10)) => {},
2329 }
2330
2331 oracle
2333 .limit_bandwidth(pk_sender.clone(), None, None)
2334 .await
2335 .unwrap();
2336
2337 select! {
2339 _ = receiver_rx.recv() => {},
2340 _ = context.sleep(Duration::from_secs(1)) => {
2341 panic!("timeout");
2342 },
2343 }
2344 });
2345 }
2346
2347 #[test]
2348 fn register_peer_set() {
2349 let executor = deterministic::Runner::default();
2350 executor.start(|context| async move {
2351 let (network, oracle) = Network::new(
2352 context.child("network"),
2353 Config {
2354 max_size: 1024 * 1024,
2355 disconnect_on_block: true,
2356 tracked_peer_sets: NZUsize!(3),
2357 },
2358 );
2359 network.start();
2360
2361 let mut manager = oracle.manager();
2362 assert_eq!(manager.peer_set(0).await, None);
2363
2364 let pk1 = PrivateKey::from_seed(1).public_key();
2365 let pk2 = PrivateKey::from_seed(2).public_key();
2366 manager.track(0xFF, Set::try_from([pk1.clone(), pk2.clone()]).unwrap());
2367
2368 assert_eq!(
2369 manager.peer_set(0xFF).await.unwrap(),
2370 TrackedPeers::primary(Set::try_from([pk1, pk2]).unwrap())
2371 );
2372 });
2373 }
2374
2375 #[test]
2376 fn test_socket_manager() {
2377 let executor = deterministic::Runner::default();
2378 executor.start(|context| async move {
2379 let (network, oracle) = Network::new(
2380 context.child("network"),
2381 Config {
2382 max_size: 1024 * 1024,
2383 disconnect_on_block: true,
2384 tracked_peer_sets: NZUsize!(3),
2385 },
2386 );
2387 network.start();
2388
2389 let pk1 = PrivateKey::from_seed(1).public_key();
2390 let pk2 = PrivateKey::from_seed(2).public_key();
2391 let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2392 let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
2393
2394 let mut manager = oracle.socket_manager();
2395 manager.track(
2396 1,
2397 Map::<_, Address>::try_from([
2398 (pk1.clone(), addr1.clone()),
2399 (pk2.clone(), addr2.clone()),
2400 ])
2401 .unwrap(),
2402 );
2403
2404 let peer_set = manager.peer_set(1).await.expect("peer set missing");
2405 let keys: Vec<_> = Vec::from(peer_set.primary.clone());
2406 assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2407
2408 let mut subscription = manager.subscribe().await;
2409 let update = subscription.recv().await.unwrap();
2410 assert_eq!(update.index, 1);
2411 let latest_keys: Vec<_> = Vec::from(update.latest.primary.clone());
2412 assert_eq!(latest_keys, vec![pk1.clone(), pk2.clone()]);
2413 assert!(update.latest.secondary.is_empty());
2414 let all_primary_keys: Vec<_> = Vec::from(update.all.primary.clone());
2415 assert_eq!(all_primary_keys, vec![pk1.clone(), pk2.clone()]);
2416 assert!(update.all.secondary.is_empty());
2417
2418 manager.track(
2419 2,
2420 Map::<_, Address>::try_from([(pk2.clone(), addr2)]).unwrap(),
2421 );
2422
2423 let update = subscription.recv().await.unwrap();
2424 assert_eq!(update.index, 2);
2425 let latest_keys: Vec<_> = Vec::from(update.latest.primary);
2426 assert_eq!(latest_keys, vec![pk2.clone()]);
2427 assert!(update.latest.secondary.is_empty());
2428 let all_primary_keys: Vec<_> = Vec::from(update.all.primary);
2429 assert_eq!(all_primary_keys, vec![pk1, pk2]);
2430 assert!(update.all.secondary.is_empty());
2431 });
2432 }
2433
2434 #[test]
2435 fn test_manager_track_accepts_tracked_peers() {
2436 let executor = deterministic::Runner::default();
2437 executor.start(|context| async move {
2438 let (network, oracle) = Network::new(
2439 context.child("network"),
2440 Config {
2441 max_size: 1024 * 1024,
2442 disconnect_on_block: true,
2443 tracked_peer_sets: NZUsize!(3),
2444 },
2445 );
2446 network.start();
2447
2448 let pk1 = PrivateKey::from_seed(1).public_key();
2449 let pk2 = PrivateKey::from_seed(2).public_key();
2450 let mut manager = oracle.manager();
2451
2452 manager.track(
2453 7,
2454 TrackedPeers::new(
2455 Set::try_from([pk1.clone()]).unwrap(),
2456 Set::try_from([pk2]).unwrap(),
2457 ),
2458 );
2459
2460 assert_eq!(
2461 manager.peer_set(7).await.unwrap(),
2462 TrackedPeers::new(
2463 Set::try_from([pk1]).unwrap(),
2464 Set::try_from([PrivateKey::from_seed(2).public_key()]).unwrap(),
2465 )
2466 );
2467 });
2468 }
2469
2470 #[test]
2471 fn test_manager_track_tracked_peers_overlap_primary_wins() {
2472 let executor = deterministic::Runner::default();
2473 executor.start(|context| async move {
2474 let (network, oracle) = Network::new(
2477 context.child("network"),
2478 Config {
2479 max_size: 1024 * 1024,
2480 disconnect_on_block: true,
2481 tracked_peer_sets: NZUsize!(3),
2482 },
2483 );
2484 network.start();
2485
2486 let pk1 = PrivateKey::from_seed(1).public_key();
2487 let pk2 = PrivateKey::from_seed(2).public_key();
2488 let pk3 = PrivateKey::from_seed(3).public_key();
2489 let mut manager = oracle.manager();
2490
2491 manager.track(
2492 9,
2493 TrackedPeers::new(
2494 Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
2495 Set::try_from([pk2.clone(), pk3.clone()]).unwrap(),
2496 ),
2497 );
2498
2499 assert_eq!(
2500 manager.peer_set(9).await.unwrap(),
2501 TrackedPeers::new(
2502 Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
2503 Set::try_from([pk3.clone()]).unwrap(),
2504 )
2505 );
2506
2507 let mut subscription = manager.subscribe().await;
2508 let update = subscription.recv().await.unwrap();
2509 assert_eq!(update.index, 9);
2510 assert!(update.latest.primary.position(&pk2).is_some());
2511 assert!(update.latest.secondary.position(&pk2).is_none());
2512 assert!(update.latest.secondary.position(&pk3).is_some());
2513 assert!(update.all.secondary.position(&pk2).is_none());
2514 assert!(update.all.primary.position(&pk2).is_some());
2515 });
2516 }
2517
2518 #[test]
2519 fn test_socket_manager_track_accepts_addressable_tracked_peers() {
2520 let executor = deterministic::Runner::default();
2521 executor.start(|context| async move {
2522 let (network, oracle) = Network::new(
2523 context.child("network"),
2524 Config {
2525 max_size: 1024 * 1024,
2526 disconnect_on_block: true,
2527 tracked_peer_sets: NZUsize!(3),
2528 },
2529 );
2530 network.start();
2531
2532 let pk1 = PrivateKey::from_seed(1).public_key();
2533 let pk2 = PrivateKey::from_seed(2).public_key();
2534 let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2535 let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
2536 let mut manager = oracle.socket_manager();
2537
2538 manager.track(
2539 7,
2540 AddressableTrackedPeers::new(
2541 Map::<_, Address>::try_from([(pk1.clone(), addr1)]).unwrap(),
2542 Map::<_, Address>::try_from([(pk2, addr2)]).unwrap(),
2543 ),
2544 );
2545
2546 assert_eq!(
2547 manager.peer_set(7).await.unwrap(),
2548 TrackedPeers::new(
2549 Set::try_from([pk1]).unwrap(),
2550 Set::try_from([PrivateKey::from_seed(2).public_key()]).unwrap(),
2551 )
2552 );
2553 });
2554 }
2555
2556 #[test]
2557 fn test_socket_manager_track_addressable_overlap_primary_wins() {
2558 let executor = deterministic::Runner::default();
2559 executor.start(|context| async move {
2560 let (network, oracle) = Network::new(
2562 context.child("network"),
2563 Config {
2564 max_size: 1024 * 1024,
2565 disconnect_on_block: true,
2566 tracked_peer_sets: NZUsize!(3),
2567 },
2568 );
2569 network.start();
2570
2571 let pk = PrivateKey::from_seed(1).public_key();
2572 let addr_primary: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
2573 let addr_secondary: Address = SocketAddr::from(([127, 0, 0, 1], 5000)).into();
2574 let mut manager = oracle.socket_manager();
2575 let mut subscription = manager.subscribe().await;
2576
2577 manager.track(
2578 11,
2579 AddressableTrackedPeers::new(
2580 Map::<_, Address>::try_from([(pk.clone(), addr_primary.clone())]).unwrap(),
2581 Map::<_, Address>::try_from([(pk.clone(), addr_secondary)]).unwrap(),
2582 ),
2583 );
2584
2585 let update = subscription.recv().await.unwrap();
2586 assert_eq!(update.index, 11);
2587 assert_eq!(update.latest.primary.len(), 1);
2588 assert!(update.latest.secondary.is_empty());
2589 assert!(update.all.secondary.is_empty());
2590 assert_eq!(update.latest.primary, Set::try_from([pk.clone()]).unwrap());
2591 });
2592 }
2593
2594 #[test]
2595 fn test_socket_manager_with_asymmetric_addresses() {
2596 let executor = deterministic::Runner::default();
2597 executor.start(|context| async move {
2598 let (network, oracle) = Network::new(
2599 context.child("network"),
2600 Config {
2601 max_size: 1024 * 1024,
2602 disconnect_on_block: true,
2603 tracked_peer_sets: NZUsize!(3),
2604 },
2605 );
2606 network.start();
2607
2608 let pk1 = PrivateKey::from_seed(1).public_key();
2609 let pk2 = PrivateKey::from_seed(2).public_key();
2610
2611 let addr1 = Address::Asymmetric {
2613 ingress: Ingress::Socket(SocketAddr::from(([10, 0, 0, 1], 8080))),
2614 egress: SocketAddr::from(([192, 168, 1, 1], 9090)),
2615 };
2616 let addr2 = Address::Asymmetric {
2617 ingress: Ingress::Dns {
2618 host: hostname!("node2.example.com"),
2619 port: 8080,
2620 },
2621 egress: SocketAddr::from(([192, 168, 1, 2], 9090)),
2622 };
2623
2624 let mut manager = oracle.socket_manager();
2625 manager.track(
2626 1,
2627 Map::<_, Address>::try_from([(pk1.clone(), addr1), (pk2.clone(), addr2)]).unwrap(),
2628 );
2629
2630 let peer_set = manager.peer_set(1).await.expect("peer set missing");
2632 let keys: Vec<_> = Vec::from(peer_set.primary);
2633 assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
2634
2635 let mut subscription = manager.subscribe().await;
2637 let update = subscription.recv().await.unwrap();
2638 assert_eq!(update.index, 1);
2639 let latest_keys: Vec<_> = Vec::from(update.latest.primary);
2640 assert_eq!(latest_keys, vec![pk1, pk2]);
2641 assert!(update.latest.secondary.is_empty());
2642 });
2643 }
2644
2645 #[test]
2646 fn test_peer_set_window_management() {
2647 let executor = deterministic::Runner::default();
2648 executor.start(|context| async move {
2649 let (network, oracle) = Network::new(
2650 context.child("network"),
2651 Config {
2652 max_size: 1024 * 1024,
2653 disconnect_on_block: true,
2654 tracked_peer_sets: NZUsize!(2), },
2656 );
2657 network.start();
2658
2659 let pk1 = PrivateKey::from_seed(1).public_key();
2661 let pk2 = PrivateKey::from_seed(2).public_key();
2662 let pk3 = PrivateKey::from_seed(3).public_key();
2663 let pk4 = PrivateKey::from_seed(4).public_key();
2664
2665 let mut manager = oracle.manager();
2667 manager.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
2668
2669 let (mut sender1, _receiver1) = oracle
2671 .control(pk1.clone())
2672 .register(0, TEST_QUOTA)
2673 .await
2674 .unwrap();
2675 let (mut sender2, _receiver2) = oracle
2676 .control(pk2.clone())
2677 .register(0, TEST_QUOTA)
2678 .await
2679 .unwrap();
2680 let (mut sender3, _receiver3) = oracle
2681 .control(pk3.clone())
2682 .register(0, TEST_QUOTA)
2683 .await
2684 .unwrap();
2685 let (_mut_sender4, _receiver4) = oracle
2686 .control(pk4.clone())
2687 .register(0, TEST_QUOTA)
2688 .await
2689 .unwrap();
2690
2691 for peer_a in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2693 for peer_b in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
2694 if peer_a != peer_b {
2695 oracle
2696 .add_link(
2697 peer_a.clone(),
2698 peer_b.clone(),
2699 Link {
2700 latency: Duration::from_millis(1),
2701 jitter: Duration::ZERO,
2702 success_rate: 1.0,
2703 },
2704 )
2705 .await
2706 .unwrap();
2707 }
2708 }
2709 }
2710
2711 let recipients = sender1.check(Recipients::All).unwrap().recipients();
2713 assert_eq!(recipients, vec![pk2.clone()]);
2714 assert!(!recipients.contains(&pk3));
2715
2716 manager.track(2, Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap());
2718 assert!(manager.peer_set(2).await.is_some());
2719
2720 let recipients = sender1.check(Recipients::All).unwrap().recipients();
2722 assert!(recipients.contains(&pk3));
2723
2724 manager.track(3, Set::try_from(vec![pk3.clone(), pk4.clone()]).unwrap());
2726 assert!(manager.peer_set(3).await.is_some());
2727
2728 let recipients = sender2.check(Recipients::All).unwrap().recipients();
2730 assert!(!recipients.contains(&pk1));
2731
2732 assert!(recipients.contains(&pk3));
2734
2735 let recipients = sender3.check(Recipients::All).unwrap().recipients();
2737 assert!(recipients.contains(&pk4));
2738
2739 let peer_set_2 = manager.peer_set(2).await.unwrap();
2741 assert!(peer_set_2.primary.position(&pk2).is_some());
2742 assert!(peer_set_2.primary.position(&pk3).is_some());
2743
2744 let peer_set_3 = manager.peer_set(3).await.unwrap();
2745 assert!(peer_set_3.primary.position(&pk3).is_some());
2746 assert!(peer_set_3.primary.position(&pk4).is_some());
2747
2748 assert!(manager.peer_set(1).await.is_none());
2750 });
2751 }
2752
2753 #[test]
2754 fn test_connected_subscription_updates_after_track() {
2755 let executor = deterministic::Runner::default();
2756 executor.start(|context| async move {
2757 let (network, oracle) = Network::new(
2758 context.child("network"),
2759 Config {
2760 max_size: 1024 * 1024,
2761 disconnect_on_block: true,
2762 tracked_peer_sets: NZUsize!(2),
2763 },
2764 );
2765 network.start();
2766
2767 let pk1 = PrivateKey::from_seed(1).public_key();
2768 let pk2 = PrivateKey::from_seed(2).public_key();
2769 let (mut sender, _) = oracle
2770 .control(pk1.clone())
2771 .register(0, TEST_QUOTA)
2772 .await
2773 .unwrap();
2774
2775 assert!(sender
2776 .check(Recipients::All)
2777 .unwrap()
2778 .recipients()
2779 .is_empty());
2780
2781 let mut manager = oracle.manager();
2782 manager.track(1, Set::try_from([pk1, pk2.clone()]).unwrap());
2783 assert!(manager.peer_set(1).await.is_some());
2784
2785 assert_eq!(
2786 sender.check(Recipients::All).unwrap().recipients(),
2787 vec![pk2]
2788 );
2789 });
2790 }
2791
2792 #[test]
2793 fn test_sender_removed_from_peer_set_drops_message() {
2794 let executor = deterministic::Runner::default();
2795 executor.start(|context| async move {
2796 let (network, oracle) = Network::new(
2798 context.child("network"),
2799 Config {
2800 max_size: 1024 * 1024,
2801 disconnect_on_block: true,
2802 tracked_peer_sets: NZUsize!(1),
2803 },
2804 );
2805 network.start();
2806 let mut manager = oracle.manager();
2807 let mut subscription = manager.subscribe().await;
2808
2809 let sender_pk = PrivateKey::from_seed(1).public_key();
2811 let recipient_pk = PrivateKey::from_seed(2).public_key();
2812 manager.track(
2813 1,
2814 Set::try_from(vec![sender_pk.clone(), recipient_pk.clone()]).unwrap(),
2815 );
2816 let update = subscription.recv().await.unwrap();
2817 assert_eq!(update.index, 1);
2818
2819 let (mut sender, _) = oracle
2821 .control(sender_pk.clone())
2822 .register(0, TEST_QUOTA)
2823 .await
2824 .unwrap();
2825 let (_sender2, mut receiver) = oracle
2826 .control(recipient_pk.clone())
2827 .register(0, TEST_QUOTA)
2828 .await
2829 .unwrap();
2830
2831 oracle
2833 .add_link(
2834 sender_pk.clone(),
2835 recipient_pk.clone(),
2836 Link {
2837 latency: Duration::from_millis(1),
2838 jitter: Duration::ZERO,
2839 success_rate: 1.0,
2840 },
2841 )
2842 .await
2843 .unwrap();
2844
2845 let initial_msg = IoBuf::from(b"tracked");
2847 let sent = sender.send(
2848 Recipients::One(recipient_pk.clone()),
2849 initial_msg.clone(),
2850 false,
2851 );
2852 assert_eq!(sent.len(), 1);
2853 assert_eq!(sent[0], recipient_pk);
2854 let (_pk, received) = receiver.recv().await.unwrap();
2855 assert_eq!(received, initial_msg.clone());
2856
2857 let other_pk = PrivateKey::from_seed(3).public_key();
2859 manager.track(
2860 2,
2861 Set::try_from(vec![recipient_pk.clone(), other_pk]).unwrap(),
2862 );
2863 let update = subscription.recv().await.unwrap();
2864 assert_eq!(update.index, 2);
2865
2866 let sent = sender.send(
2869 Recipients::One(recipient_pk.clone()),
2870 IoBuf::from(b"untracked"),
2871 false,
2872 );
2873 assert_eq!(sent, vec![recipient_pk.clone()]);
2874
2875 select! {
2877 _ = receiver.recv() => {
2878 panic!("unexpected message");
2879 },
2880 _ = context.sleep(Duration::from_secs(10)) => {},
2881 }
2882
2883 manager.track(
2885 3,
2886 Set::try_from(vec![sender_pk.clone(), recipient_pk.clone()]).unwrap(),
2887 );
2888 let update = subscription.recv().await.unwrap();
2889 assert_eq!(update.index, 3);
2890
2891 let sent = sender.send(
2893 Recipients::One(recipient_pk.clone()),
2894 initial_msg.clone(),
2895 false,
2896 );
2897 assert_eq!(sent.len(), 1);
2898 assert_eq!(sent[0], recipient_pk);
2899 let (_pk, received) = receiver.recv().await.unwrap();
2900 assert_eq!(received, initial_msg);
2901 });
2902 }
2903
2904 #[test]
2905 fn test_subscribe_to_peer_sets() {
2906 let executor = deterministic::Runner::default();
2907 executor.start(|context| async move {
2908 let (network, oracle) = Network::new(
2909 context.child("network"),
2910 Config {
2911 max_size: 1024 * 1024,
2912 disconnect_on_block: true,
2913 tracked_peer_sets: NZUsize!(2),
2914 },
2915 );
2916 network.start();
2917
2918 let mut manager = oracle.manager();
2920 let mut subscription = manager.subscribe().await;
2921
2922 let pk1 = PrivateKey::from_seed(1).public_key();
2924 let pk2 = PrivateKey::from_seed(2).public_key();
2925 let pk3 = PrivateKey::from_seed(3).public_key();
2926
2927 manager.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
2929
2930 let update = subscription.recv().await.unwrap();
2932 assert_eq!(update.index, 1);
2933 assert_eq!(
2934 update.latest.primary,
2935 Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap()
2936 );
2937 assert!(update.latest.secondary.is_empty());
2938 assert_eq!(
2939 update.all.primary,
2940 Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap()
2941 );
2942 assert!(update.all.secondary.is_empty());
2943
2944 manager.track(2, Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap());
2946
2947 let update = subscription.recv().await.unwrap();
2949 assert_eq!(update.index, 2);
2950 assert_eq!(
2951 update.latest.primary,
2952 Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap()
2953 );
2954 assert!(update.latest.secondary.is_empty());
2955 assert_eq!(
2956 update.all.primary,
2957 vec![pk1.clone(), pk2.clone(), pk3.clone()]
2958 .try_into()
2959 .unwrap()
2960 );
2961 assert!(update.all.secondary.is_empty());
2962
2963 manager.track(3, Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap());
2965
2966 let update = subscription.recv().await.unwrap();
2968 assert_eq!(update.index, 3);
2969 assert_eq!(
2970 update.latest.primary,
2971 Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
2972 );
2973 assert!(update.latest.secondary.is_empty());
2974 assert_eq!(
2975 update.all.primary,
2976 vec![pk1.clone(), pk2.clone(), pk3.clone()]
2977 .try_into()
2978 .unwrap()
2979 );
2980 assert!(update.all.secondary.is_empty());
2981
2982 manager.track(4, Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap());
2984
2985 let update = subscription.recv().await.unwrap();
2987 assert_eq!(update.index, 4);
2988 assert_eq!(
2989 update.latest.primary,
2990 Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
2991 );
2992 assert!(update.latest.secondary.is_empty());
2993 assert_eq!(
2994 update.all.primary,
2995 Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
2996 );
2997 assert!(update.all.secondary.is_empty());
2998 });
2999 }
3000
3001 #[test]
3002 fn test_multiple_subscriptions() {
3003 let executor = deterministic::Runner::default();
3004 executor.start(|context| async move {
3005 let (network, oracle) = Network::new(
3006 context.child("network"),
3007 Config {
3008 max_size: 1024 * 1024,
3009 disconnect_on_block: true,
3010 tracked_peer_sets: NZUsize!(3),
3011 },
3012 );
3013 network.start();
3014
3015 let mut manager = oracle.manager();
3017 let mut subscription1 = manager.subscribe().await;
3018 let mut subscription2 = manager.subscribe().await;
3019 let mut subscription3 = manager.subscribe().await;
3020
3021 let pk1 = PrivateKey::from_seed(1).public_key();
3023 let pk2 = PrivateKey::from_seed(2).public_key();
3024
3025 manager.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
3027
3028 let update1 = subscription1.recv().await.unwrap();
3030 let update2 = subscription2.recv().await.unwrap();
3031 let update3 = subscription3.recv().await.unwrap();
3032
3033 assert_eq!(update1.index, 1);
3034 assert_eq!(update2.index, 1);
3035 assert_eq!(update3.index, 1);
3036
3037 drop(subscription2);
3039
3040 manager.track(2, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap());
3042
3043 let update1 = subscription1.recv().await.unwrap();
3045 let update3 = subscription3.recv().await.unwrap();
3046
3047 assert_eq!(update1.index, 2);
3048 assert_eq!(update3.index, 2);
3049 });
3050 }
3051
3052 #[test]
3053 fn test_subscription_includes_self_when_registered() {
3054 let executor = deterministic::Runner::default();
3055 executor.start(|context| async move {
3056 let (network, oracle) = Network::new(
3057 context.child("network"),
3058 Config {
3059 max_size: 1024 * 1024,
3060 disconnect_on_block: true,
3061 tracked_peer_sets: NZUsize!(2),
3062 },
3063 );
3064 network.start();
3065
3066 let self_pk = PrivateKey::from_seed(0).public_key();
3068 let other_pk = PrivateKey::from_seed(1).public_key();
3069
3070 let (_sender, _receiver) = oracle
3072 .control(self_pk.clone())
3073 .register(0, TEST_QUOTA)
3074 .await
3075 .unwrap();
3076
3077 let mut manager = oracle.manager();
3079 let mut subscription = manager.subscribe().await;
3080
3081 manager.track(1, Set::try_from(vec![other_pk.clone()]).unwrap());
3083
3084 let update = subscription.recv().await.unwrap();
3086 assert_eq!(update.index, 1);
3087 assert_eq!(update.latest.primary.len(), 1);
3088 assert!(update.latest.secondary.is_empty());
3089 assert_eq!(update.all.primary.len(), 1);
3090 assert!(update.all.secondary.is_empty());
3091
3092 assert!(
3094 update.latest.primary.position(&self_pk).is_none(),
3095 "latest primary set should not include self"
3096 );
3097 assert!(
3098 update.latest.primary.position(&other_pk).is_some(),
3099 "latest primary set should include other"
3100 );
3101
3102 assert!(
3104 update.all.primary.position(&self_pk).is_none(),
3105 "peer set should not include self"
3106 );
3107 assert!(
3108 update.all.primary.position(&other_pk).is_some(),
3109 "peer set should include other"
3110 );
3111
3112 manager.track(
3114 2,
3115 Set::try_from(vec![self_pk.clone(), other_pk.clone()]).unwrap(),
3116 );
3117
3118 let update = subscription.recv().await.unwrap();
3119 assert_eq!(update.index, 2);
3120 assert_eq!(update.latest.primary.len(), 2);
3121 assert!(update.latest.secondary.is_empty());
3122 assert_eq!(update.all.primary.len(), 2);
3123 assert!(update.all.secondary.is_empty());
3124
3125 assert!(
3127 update.latest.primary.position(&self_pk).is_some(),
3128 "latest primary set should include self"
3129 );
3130 assert!(
3131 update.latest.primary.position(&other_pk).is_some(),
3132 "latest primary set should include other"
3133 );
3134
3135 assert!(
3137 update.all.primary.position(&self_pk).is_some(),
3138 "peer set should include self"
3139 );
3140 assert!(
3141 update.all.primary.position(&other_pk).is_some(),
3142 "peer set should include other"
3143 );
3144 });
3145 }
3146
3147 #[test]
3148 fn test_rate_limiting() {
3149 let executor = deterministic::Runner::default();
3150 executor.start(|context| async move {
3151 let cfg = Config {
3152 max_size: 1024 * 1024,
3153 disconnect_on_block: true,
3154 tracked_peer_sets: NZUsize!(3),
3155 };
3156 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3158 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3159
3160 let (network, oracle) =
3161 Network::new_with_peers(context.child("network"), cfg, [pk1.clone(), pk2.clone()])
3162 .await;
3163 network.start();
3164
3165 let restrictive_quota = Quota::per_second(NZU32!(1));
3167 let control1 = oracle.control(pk1.clone());
3168 let (mut sender, _) = control1.register(0, restrictive_quota).await.unwrap();
3169 let control2 = oracle.control(pk2.clone());
3170 let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
3171
3172 let link = ingress::Link {
3174 latency: Duration::from_millis(0),
3175 jitter: Duration::from_millis(0),
3176 success_rate: 1.0,
3177 };
3178 oracle
3179 .add_link(pk1.clone(), pk2.clone(), link.clone())
3180 .await
3181 .unwrap();
3182 oracle.add_link(pk2.clone(), pk1, link).await.unwrap();
3183
3184 let msg1 = IoBuf::from(b"message1");
3186 let result1 = sender.send(Recipients::One(pk2.clone()), msg1.clone(), false);
3187 assert_eq!(result1.len(), 1, "first message should be sent");
3188
3189 let (_, received1) = receiver.recv().await.unwrap();
3191 assert_eq!(received1, msg1);
3192
3193 let msg2 = IoBuf::from(b"message2");
3195 let result2 = sender.send(Recipients::One(pk2.clone()), msg2.clone(), false);
3196 assert_eq!(
3197 result2.len(),
3198 0,
3199 "second message should be rate-limited (skipped)"
3200 );
3201
3202 context.sleep(Duration::from_secs(1)).await;
3204
3205 let msg3 = IoBuf::from(b"message3");
3207 let result3 = sender.send(Recipients::One(pk2.clone()), msg3.clone(), false);
3208 assert_eq!(result3.len(), 1, "third message should be sent after wait");
3209
3210 let (_, received3) = receiver.recv().await.unwrap();
3212 assert_eq!(received3, msg3);
3213 });
3214 }
3215
3216 #[test]
3217 fn test_operations_after_shutdown_do_not_panic() {
3218 let executor = deterministic::Runner::default();
3219 executor.start(|context| async move {
3220 let cfg = Config {
3221 max_size: 1024 * 1024,
3222 disconnect_on_block: true,
3223 tracked_peer_sets: NZUsize!(3),
3224 };
3225 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3227 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3228
3229 let (network, oracle) =
3230 Network::new_with_peers(context.child("network"), cfg, [pk1.clone(), pk2.clone()])
3231 .await;
3232 let handle = network.start();
3233 let mut manager = oracle.manager();
3234
3235 let control1 = oracle.control(pk1.clone());
3237 let (mut sender, _receiver) = control1.register(0, TEST_QUOTA).await.unwrap();
3238
3239 let link = ingress::Link {
3241 latency: Duration::from_millis(10),
3242 jitter: Duration::from_millis(0),
3243 success_rate: 1.0,
3244 };
3245 oracle
3246 .add_link(pk1.clone(), pk2.clone(), link.clone())
3247 .await
3248 .unwrap();
3249 wait_for_task_count(&context, "network", |count| count > 0).await;
3250
3251 handle.abort();
3253 let _ = handle.await;
3254 wait_for_task_count(&context, "network", |count| count == 0).await;
3255
3256 let msg = IoBuf::from(b"test");
3258 let result = sender.send(Recipients::One(pk2.clone()), msg, false);
3259 assert!(result.is_empty(), "send after shutdown should return empty");
3260
3261 manager.track(1, Set::try_from([pk1.clone()]).unwrap());
3263 let _ = manager.peer_set(0).await;
3264 let _ = manager.subscribe().await;
3265
3266 let _ = oracle
3268 .add_link(pk1.clone(), pk2.clone(), link.clone())
3269 .await;
3270 let _ = oracle.remove_link(pk1.clone(), pk2.clone()).await;
3271 let _ = oracle.blocked().await;
3272
3273 let _ = control1.register(1, TEST_QUOTA).await;
3275 });
3276 }
3277
3278 fn clean_shutdown(seed: u64) {
3279 let cfg = deterministic::Config::default()
3280 .with_seed(seed)
3281 .with_timeout(Some(Duration::from_secs(30)));
3282 let executor = deterministic::Runner::new(cfg);
3283 executor.start(|context| async move {
3284 let cfg = Config {
3285 max_size: 1024 * 1024,
3286 disconnect_on_block: true,
3287 tracked_peer_sets: NZUsize!(3),
3288 };
3289 let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
3291 let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
3292
3293 let (network, oracle) =
3294 Network::new_with_peers(context.child("network"), cfg, [pk1.clone(), pk2.clone()])
3295 .await;
3296 let handle = network.start();
3297
3298 let control1 = oracle.control(pk1.clone());
3300 let control2 = oracle.control(pk2.clone());
3301 let (mut sender, _) = control1.register(0, TEST_QUOTA).await.unwrap();
3302 let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
3303
3304 let link = ingress::Link {
3306 latency: Duration::from_millis(10),
3307 jitter: Duration::from_millis(0),
3308 success_rate: 1.0,
3309 };
3310 oracle
3311 .add_link(pk1.clone(), pk2.clone(), link.clone())
3312 .await
3313 .unwrap();
3314 oracle
3315 .add_link(pk2.clone(), pk1.clone(), link)
3316 .await
3317 .unwrap();
3318
3319 wait_for_task_count(&context, "network", |count| count > 0).await;
3321 let running_before = count_running_tasks(&context, "network");
3322 assert!(
3323 running_before > 0,
3324 "at least one network task should be running"
3325 );
3326
3327 let msg = IoBuf::from(b"test_message");
3329 let result = sender.send(Recipients::One(pk2.clone()), msg.clone(), false);
3330 assert_eq!(result.len(), 1, "message should be sent");
3331
3332 let (_, received) = receiver.recv().await.unwrap();
3333 assert_eq!(received, msg, "message should be received");
3334
3335 handle.abort();
3337 let _ = handle.await;
3338
3339 wait_for_task_count(&context, "network", |count| count == 0).await;
3341 let running_after = count_running_tasks(&context, "network");
3342 assert_eq!(
3343 running_after, 0,
3344 "all network tasks should be stopped, but {running_after} still running"
3345 );
3346 });
3347 }
3348
3349 #[test]
3350 fn test_clean_shutdown() {
3351 for seed in 0..25 {
3352 clean_shutdown(seed);
3353 }
3354 }
3355
3356 #[test]
3357 fn test_socket_manager_overwrite() {
3358 let executor = deterministic::Runner::default();
3359 executor.start(|context| async move {
3360 let (network, oracle) = Network::new(
3362 context.child("network"),
3363 Config {
3364 max_size: 1024 * 1024,
3365 disconnect_on_block: true,
3366 tracked_peer_sets: NZUsize!(3),
3367 },
3368 );
3369 network.start();
3370
3371 let pk1 = PrivateKey::from_seed(1).public_key();
3373 let pk2 = PrivateKey::from_seed(2).public_key();
3374 let _pk3 = PrivateKey::from_seed(3).public_key();
3375
3376 let mut socket_manager = oracle.socket_manager();
3377
3378 let addr: Address = "127.0.0.1:8000".parse::<SocketAddr>().unwrap().into();
3380
3381 socket_manager.track(
3383 0,
3384 Map::<PublicKey, Address>::try_from([
3385 (
3386 pk1.clone(),
3387 "127.0.0.1:8001".parse::<SocketAddr>().unwrap().into(),
3388 ),
3389 (pk2, "127.0.0.1:8002".parse::<SocketAddr>().unwrap().into()),
3390 ])
3391 .unwrap(),
3392 );
3393
3394 socket_manager.overwrite([(pk1, addr)].try_into().unwrap());
3396 });
3397 }
3398
3399 #[test]
3400 fn test_subscribe_returns_current_peer_set() {
3401 let executor = deterministic::Runner::default();
3402 executor.start(|context| async move {
3403 let (network, oracle) = Network::new(
3404 context.child("network"),
3405 Config {
3406 max_size: 1024 * 1024,
3407 disconnect_on_block: true,
3408 tracked_peer_sets: NZUsize!(3),
3409 },
3410 );
3411 network.start();
3412
3413 let pk1 = PrivateKey::from_seed(0).public_key();
3415 let pk2 = PrivateKey::from_seed(1).public_key();
3416 let peers = ordered::Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap();
3417
3418 let mut manager = oracle.manager();
3419 Manager::track(&mut manager, 0, peers.clone());
3420
3421 let mut subscription = Provider::subscribe(&mut manager).await;
3424 let update = subscription
3425 .try_recv()
3426 .expect("current peer set should be available immediately after subscribe");
3427 assert_eq!(update.index, 0);
3428 assert_eq!(update.latest.primary, peers);
3429 assert!(update.latest.secondary.is_empty());
3430 });
3431 }
3432}