1use bytes::Bytes;
54use commonware_utils::{channel::oneshot, Span};
55use std::future::Future;
56
57mod config;
58pub use config::Config;
59mod engine;
60pub use engine::Engine;
61mod fetcher;
62mod ingress;
63pub use ingress::Mailbox;
64mod metrics;
65mod wire;
66
67#[cfg(feature = "mocks")]
68pub mod mocks;
69
70pub trait Producer: Clone + Send + 'static {
72 type Key: Span;
74
75 fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
77}
78
79#[cfg(test)]
80mod tests {
81 use super::{
82 mocks::{Consumer, Event, Key, Producer},
83 Config, Engine, Mailbox,
84 };
85 use crate::Resolver;
86 use bytes::Bytes;
87 use commonware_cryptography::{
88 ed25519::{PrivateKey, PublicKey},
89 Signer,
90 };
91 use commonware_macros::{select, test_traced};
92 use commonware_p2p::{
93 simulated::{Link, Network, Oracle, Receiver, Sender},
94 Blocker, Manager as _, Provider, TrackedPeers,
95 };
96 use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
97 use commonware_utils::{non_empty_vec, ordered::Set, NZUsize, NZU32};
98 use std::{collections::HashMap, num::NonZeroU32, time::Duration};
99
100 const MAILBOX_SIZE: usize = 1024;
101 const RATE_LIMIT: NonZeroU32 = NZU32!(10);
102 const INITIAL_DURATION: Duration = Duration::from_millis(100);
103 const TIMEOUT: Duration = Duration::from_millis(400);
104 const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
105 const LINK: Link = Link {
106 latency: Duration::from_millis(10),
107 jitter: Duration::from_millis(1),
108 success_rate: 1.0,
109 };
110 const LINK_UNRELIABLE: Link = Link {
111 latency: Duration::from_millis(10),
112 jitter: Duration::from_millis(1),
113 success_rate: 0.5,
114 };
115
116 async fn setup_network_and_peers(
117 context: &deterministic::Context,
118 peer_seeds: &[u64],
119 ) -> (
120 Oracle<PublicKey, deterministic::Context>,
121 Vec<PrivateKey>,
122 Vec<PublicKey>,
123 Vec<(
124 Sender<PublicKey, deterministic::Context>,
125 Receiver<PublicKey>,
126 )>,
127 ) {
128 setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
129 .await
130 }
131
132 async fn setup_network_and_peers_with_rate_limit(
133 context: &deterministic::Context,
134 peer_seeds: &[u64],
135 rate_limit: Quota,
136 ) -> (
137 Oracle<PublicKey, deterministic::Context>,
138 Vec<PrivateKey>,
139 Vec<PublicKey>,
140 Vec<(
141 Sender<PublicKey, deterministic::Context>,
142 Receiver<PublicKey>,
143 )>,
144 ) {
145 let (network, oracle) = Network::new(
146 context.with_label("network"),
147 commonware_p2p::simulated::Config {
148 max_size: 1024 * 1024,
149 disconnect_on_block: true,
150 tracked_peer_sets: NZUsize!(3),
151 },
152 );
153 network.start();
154
155 let schemes: Vec<PrivateKey> = peer_seeds
156 .iter()
157 .map(|seed| PrivateKey::from_seed(*seed))
158 .collect();
159 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
160 let mut manager = oracle.manager();
161 manager
162 .track(0, Set::try_from(peers.clone()).unwrap())
163 .await;
164
165 let mut connections = Vec::new();
166 for peer in &peers {
167 let (sender, receiver) = oracle
168 .control(peer.clone())
169 .register(0, rate_limit)
170 .await
171 .unwrap();
172 connections.push((sender, receiver));
173 }
174
175 (oracle, schemes, peers, connections)
176 }
177
178 async fn add_link(
179 oracle: &mut Oracle<PublicKey, deterministic::Context>,
180 link: Link,
181 peers: &[PublicKey],
182 from: usize,
183 to: usize,
184 ) {
185 oracle
186 .add_link(peers[from].clone(), peers[to].clone(), link.clone())
187 .await
188 .unwrap();
189 oracle
190 .add_link(peers[to].clone(), peers[from].clone(), link)
191 .await
192 .unwrap();
193 }
194
195 fn setup_and_spawn_actor(
196 context: &deterministic::Context,
197 provider: impl Provider<PublicKey = PublicKey>,
198 blocker: impl Blocker<PublicKey = PublicKey>,
199 signer: impl Signer<PublicKey = PublicKey>,
200 connection: (
201 Sender<PublicKey, deterministic::Context>,
202 Receiver<PublicKey>,
203 ),
204 consumer: Consumer<Key, Bytes>,
205 producer: Producer<Key, Bytes>,
206 ) -> Mailbox<Key, PublicKey> {
207 let public_key = signer.public_key();
208 let (engine, mailbox) = Engine::new(
209 context.with_label(&format!("actor_{public_key}")),
210 Config {
211 peer_provider: provider,
212 blocker,
213 consumer,
214 producer,
215 mailbox_size: MAILBOX_SIZE,
216 me: Some(public_key),
217 initial: INITIAL_DURATION,
218 timeout: TIMEOUT,
219 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
220 priority_requests: false,
221 priority_responses: false,
222 },
223 );
224 engine.start(connection);
225
226 mailbox
227 }
228
229 #[test_traced]
233 fn test_fetch_success() {
234 let executor = deterministic::Runner::timed(Duration::from_secs(10));
235 executor.start(|context| async move {
236 let (mut oracle, mut schemes, peers, mut connections) =
237 setup_network_and_peers(&context, &[1, 2]).await;
238
239 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
240
241 let key = Key(2);
242 let mut prod2 = Producer::default();
243 prod2.insert(key.clone(), Bytes::from("data for key 2"));
244
245 let (cons1, mut cons_out1) = Consumer::new();
246
247 let scheme = schemes.remove(0);
248 let mut mailbox1 = setup_and_spawn_actor(
249 &context,
250 oracle.manager(),
251 oracle.control(scheme.public_key()),
252 scheme,
253 connections.remove(0),
254 cons1,
255 Producer::default(),
256 );
257
258 let scheme = schemes.remove(0);
259 let _mailbox2 = setup_and_spawn_actor(
260 &context,
261 oracle.manager(),
262 oracle.control(scheme.public_key()),
263 scheme,
264 connections.remove(0),
265 Consumer::dummy(),
266 prod2,
267 );
268
269 mailbox1.fetch(key.clone()).await;
270
271 let event = cons_out1.recv().await.unwrap();
272 match event {
273 Event::Success(key_actual, value) => {
274 assert_eq!(key_actual, key);
275 assert_eq!(value, Bytes::from("data for key 2"));
276 }
277 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
278 }
279 });
280 }
281
282 #[test_traced]
286 fn test_cancel_fetch() {
287 let executor = deterministic::Runner::timed(Duration::from_secs(10));
288 executor.start(|context| async move {
289 let (oracle, mut schemes, _peers, mut connections) =
290 setup_network_and_peers(&context, &[1]).await;
291
292 let (cons1, mut cons_out1) = Consumer::new();
293 let prod1 = Producer::default();
294
295 let scheme = schemes.remove(0);
296 let mut mailbox1 = setup_and_spawn_actor(
297 &context,
298 oracle.manager(),
299 oracle.control(scheme.public_key()),
300 scheme,
301 connections.remove(0),
302 cons1,
303 prod1,
304 );
305
306 let key = Key(3);
307 mailbox1.fetch(key.clone()).await;
308 mailbox1.cancel(key.clone()).await;
309
310 let event = cons_out1.recv().await.unwrap();
311 match event {
312 Event::Failed(key_actual) => {
313 assert_eq!(key_actual, key);
314 }
315 Event::Success(_, _) => panic!("Fetch should have been canceled"),
316 }
317 });
318 }
319
320 #[test_traced]
325 fn test_peer_no_data() {
326 let executor = deterministic::Runner::timed(Duration::from_secs(10));
327 executor.start(|context| async move {
328 let (mut oracle, mut schemes, peers, mut connections) =
329 setup_network_and_peers(&context, &[1, 2, 3]).await;
330
331 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
332 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
333
334 let prod1 = Producer::default();
335 let prod2 = Producer::default();
336 let mut prod3 = Producer::default();
337 let key = Key(3);
338 prod3.insert(key.clone(), Bytes::from("data for key 3"));
339
340 let (cons1, mut cons_out1) = Consumer::new();
341
342 let scheme = schemes.remove(0);
343 let mut mailbox1 = setup_and_spawn_actor(
344 &context,
345 oracle.manager(),
346 oracle.control(scheme.public_key()),
347 scheme,
348 connections.remove(0),
349 cons1,
350 prod1,
351 );
352
353 let scheme = schemes.remove(0);
354 let _mailbox2 = setup_and_spawn_actor(
355 &context,
356 oracle.manager(),
357 oracle.control(scheme.public_key()),
358 scheme,
359 connections.remove(0),
360 Consumer::dummy(),
361 prod2,
362 );
363
364 let scheme = schemes.remove(0);
365 let _mailbox3 = setup_and_spawn_actor(
366 &context,
367 oracle.manager(),
368 oracle.control(scheme.public_key()),
369 scheme,
370 connections.remove(0),
371 Consumer::dummy(),
372 prod3,
373 );
374
375 mailbox1.fetch(key.clone()).await;
376
377 let event = cons_out1.recv().await.unwrap();
378 match event {
379 Event::Success(key_actual, value) => {
380 assert_eq!(key_actual, key);
381 assert_eq!(value, Bytes::from("data for key 3"));
382 }
383 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
384 }
385 });
386 }
387
388 #[test_traced]
393 fn test_no_peers_available() {
394 let executor = deterministic::Runner::timed(Duration::from_secs(10));
395 executor.start(|context| async move {
396 let (oracle, mut schemes, _peers, mut connections) =
397 setup_network_and_peers(&context, &[1]).await;
398
399 let (cons1, mut cons_out1) = Consumer::new();
400 let prod1 = Producer::default();
401
402 let scheme = schemes.remove(0);
403 let mut mailbox1 = setup_and_spawn_actor(
404 &context,
405 oracle.manager(),
406 oracle.control(scheme.public_key()),
407 scheme,
408 connections.remove(0),
409 cons1,
410 prod1,
411 );
412
413 let key = Key(4);
414 mailbox1.fetch(key.clone()).await;
415 context.sleep(Duration::from_secs(5)).await;
416 mailbox1.cancel(key.clone()).await;
417
418 let event = cons_out1.recv().await.expect("Consumer channel closed");
419 match event {
420 Event::Failed(key_actual) => {
421 assert_eq!(key_actual, key);
422 }
423 Event::Success(_, _) => {
424 panic!("Fetch should have failed due to no peers")
425 }
426 }
427 });
428 }
429
430 #[test_traced]
433 fn test_fetch_before_initial_peer_set_waits_for_update() {
434 let executor = deterministic::Runner::timed(Duration::from_secs(10));
435 executor.start(|context| async move {
436 let (network, mut oracle) = Network::new(
437 context.with_label("network"),
438 commonware_p2p::simulated::Config {
439 max_size: 1024 * 1024,
440 disconnect_on_block: true,
441 tracked_peer_sets: NZUsize!(1),
442 },
443 );
444 network.start();
445
446 let mut schemes = [1_u64, 2]
447 .into_iter()
448 .map(PrivateKey::from_seed)
449 .collect::<Vec<_>>();
450 schemes.sort_by_key(|s| s.public_key());
451 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
452
453 let mut connections = Vec::new();
454 for peer in &peers {
455 let (sender, receiver) = oracle
456 .control(peer.clone())
457 .register(0, Quota::per_second(RATE_LIMIT))
458 .await
459 .unwrap();
460 connections.push((sender, receiver));
461 }
462
463 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
464
465 let key = Key(2);
466 let mut prod2 = Producer::default();
467 prod2.insert(key.clone(), Bytes::from("data for key 2"));
468
469 let (cons1, mut cons_out1) = Consumer::new();
470
471 let scheme = schemes.remove(0);
472 let mut mailbox1 = setup_and_spawn_actor(
473 &context,
474 oracle.manager(),
475 oracle.control(scheme.public_key()),
476 scheme,
477 connections.remove(0),
478 cons1,
479 Producer::default(),
480 );
481
482 let scheme = schemes.remove(0);
483 let _mailbox2 = setup_and_spawn_actor(
484 &context,
485 oracle.manager(),
486 oracle.control(scheme.public_key()),
487 scheme,
488 connections.remove(0),
489 Consumer::dummy(),
490 prod2,
491 );
492
493 mailbox1.fetch(key.clone()).await;
494
495 select! {
496 event = cons_out1.recv() => {
497 panic!("fetch should wait for the initial peer set, got {event:?}");
498 },
499 _ = context.sleep(Duration::from_millis(200)) => {}
500 };
501
502 oracle
503 .manager()
504 .track(0, Set::try_from(peers.clone()).unwrap())
505 .await;
506
507 let event = cons_out1.recv().await.unwrap();
508 match event {
509 Event::Success(key_actual, value) => {
510 assert_eq!(key_actual, key);
511 assert_eq!(value, Bytes::from("data for key 2"));
512 }
513 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
514 }
515 });
516 }
517
518 #[test_traced]
522 fn test_concurrent_fetch_requests() {
523 let executor = deterministic::Runner::timed(Duration::from_secs(60));
524 executor.start(|context| async move {
525 let (mut oracle, mut schemes, peers, mut connections) =
526 setup_network_and_peers(&context, &[1, 2, 3]).await;
527
528 let key2 = Key(2);
529 let key3 = Key(3);
530 let mut prod2 = Producer::default();
531 prod2.insert(key2.clone(), Bytes::from("data for key 2"));
532 let mut prod3 = Producer::default();
533 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
534
535 let (cons1, mut cons_out1) = Consumer::new();
536
537 let scheme = schemes.remove(0);
538 let mut mailbox1 = setup_and_spawn_actor(
539 &context,
540 oracle.manager(),
541 oracle.control(scheme.public_key()),
542 scheme,
543 connections.remove(0),
544 cons1,
545 Producer::default(),
546 );
547
548 let scheme = schemes.remove(0);
549 let _mailbox2 = setup_and_spawn_actor(
550 &context,
551 oracle.manager(),
552 oracle.control(scheme.public_key()),
553 scheme,
554 connections.remove(0),
555 Consumer::dummy(),
556 prod2,
557 );
558
559 let scheme = schemes.remove(0);
560 let _mailbox3 = setup_and_spawn_actor(
561 &context,
562 oracle.manager(),
563 oracle.control(scheme.public_key()),
564 scheme,
565 connections.remove(0),
566 Consumer::dummy(),
567 prod3,
568 );
569
570 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
572 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
573
574 for _ in 0..10 {
576 mailbox1.fetch(key2.clone()).await;
578 mailbox1.fetch(key3.clone()).await;
579
580 let mut events = Vec::new();
582 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
583 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
584
585 let mut found_key2 = false;
587 let mut found_key3 = false;
588 for event in events {
589 match event {
590 Event::Success(key_actual, value) => {
591 if key_actual == key2 {
592 assert_eq!(value, Bytes::from("data for key 2"));
593 found_key2 = true;
594 } else if key_actual == key3 {
595 assert_eq!(value, Bytes::from("data for key 3"));
596 found_key3 = true;
597 } else {
598 panic!("Unexpected key received");
599 }
600 }
601 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
602 }
603 }
604 assert!(found_key2 && found_key3,);
605 }
606 });
607 }
608
609 #[test_traced]
612 fn test_cancel() {
613 let executor = deterministic::Runner::timed(Duration::from_secs(10));
614 executor.start(|context| async move {
615 let (mut oracle, mut schemes, peers, mut connections) =
616 setup_network_and_peers(&context, &[1, 2]).await;
617
618 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
619
620 let key = Key(6);
621 let mut prod2 = Producer::default();
622 prod2.insert(key.clone(), Bytes::from("data for key 6"));
623
624 let (cons1, mut cons_out1) = Consumer::new();
625
626 let scheme = schemes.remove(0);
627 let mut mailbox1 = setup_and_spawn_actor(
628 &context,
629 oracle.manager(),
630 oracle.control(scheme.public_key()),
631 scheme,
632 connections.remove(0),
633 cons1,
634 Producer::default(),
635 );
636
637 let scheme = schemes.remove(0);
638 let _mailbox2 = setup_and_spawn_actor(
639 &context,
640 oracle.manager(),
641 oracle.control(scheme.public_key()),
642 scheme,
643 connections.remove(0),
644 Consumer::dummy(),
645 prod2,
646 );
647
648 mailbox1.cancel(key.clone()).await;
650 select! {
651 _ = cons_out1.recv() => {
652 panic!("unexpected event");
653 },
654 _ = context.sleep(Duration::from_millis(100)) => {},
655 };
656
657 mailbox1.fetch(key.clone()).await;
659 let event = cons_out1.recv().await.unwrap();
660 match event {
661 Event::Success(key_actual, value) => {
662 assert_eq!(key_actual, key);
663 assert_eq!(value, Bytes::from("data for key 6"));
664 }
665 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
666 }
667
668 mailbox1.cancel(key.clone()).await;
670 select! {
671 _ = cons_out1.recv() => {
672 panic!("unexpected event");
673 },
674 _ = context.sleep(Duration::from_millis(100)) => {},
675 };
676
677 let key = Key(7);
679 mailbox1.fetch(key.clone()).await;
680 mailbox1.cancel(key.clone()).await;
681
682 let event = cons_out1.recv().await.unwrap();
684 match event {
685 Event::Failed(key_actual) => {
686 assert_eq!(key_actual, key);
687 }
688 Event::Success(_, _) => panic!("Fetch should have been canceled"),
689 }
690 });
691 }
692
693 #[test_traced]
696 fn test_blocking_peer() {
697 let executor = deterministic::Runner::timed(Duration::from_secs(10));
698 executor.start(|context| async move {
699 let (mut oracle, mut schemes, peers, mut connections) =
700 setup_network_and_peers(&context, &[1, 2, 3]).await;
701
702 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
703 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
704 add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
705
706 let key_a = Key(1);
707 let key_b = Key(2);
708 let invalid_data_a = Bytes::from("invalid for A");
709 let valid_data_a = Bytes::from("valid for A");
710 let valid_data_b = Bytes::from("valid for B");
711
712 let mut prod2 = Producer::default();
714 prod2.insert(key_a.clone(), invalid_data_a.clone());
715 prod2.insert(key_b.clone(), valid_data_b.clone());
716
717 let mut prod3 = Producer::default();
718 prod3.insert(key_a.clone(), valid_data_a.clone());
719
720 let (mut cons1, mut cons_out1) = Consumer::new();
722 cons1.add_expected(key_a.clone(), valid_data_a.clone());
723 cons1.add_expected(key_b.clone(), valid_data_b.clone());
724
725 let scheme = schemes.remove(0);
727 let mut mailbox1 = setup_and_spawn_actor(
728 &context,
729 oracle.manager(),
730 oracle.control(scheme.public_key()),
731 scheme,
732 connections.remove(0),
733 cons1,
734 Producer::default(),
735 );
736
737 let scheme = schemes.remove(0);
738 let _mailbox2 = setup_and_spawn_actor(
739 &context,
740 oracle.manager(),
741 oracle.control(scheme.public_key()),
742 scheme,
743 connections.remove(0),
744 Consumer::dummy(),
745 prod2,
746 );
747
748 let scheme = schemes.remove(0);
749 let _mailbox3 = setup_and_spawn_actor(
750 &context,
751 oracle.manager(),
752 oracle.control(scheme.public_key()),
753 scheme,
754 connections.remove(0),
755 Consumer::dummy(),
756 prod3,
757 );
758
759 for _ in 0..20 {
761 mailbox1.fetch(key_a.clone()).await;
763
764 let event = cons_out1.recv().await.unwrap();
766 match event {
767 Event::Success(key_actual, value) => {
768 assert_eq!(key_actual, key_a);
769 assert_eq!(value, valid_data_a);
770 }
771 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
772 }
773 }
774
775 mailbox1.fetch(key_b.clone()).await;
777
778 context.sleep(Duration::from_secs(5)).await;
780
781 mailbox1.cancel(key_b.clone()).await;
783
784 let event = cons_out1.recv().await.unwrap();
786 match event {
787 Event::Failed(key_actual) => {
788 assert_eq!(key_actual, key_b);
789 }
790 Event::Success(_, _) => panic!("Fetch should have been canceled"),
791 }
792
793 let blocked = oracle.blocked().await.unwrap();
795 assert_eq!(blocked.len(), 1);
796 assert_eq!(blocked[0].0, peers[0]);
797 assert_eq!(blocked[0].1, peers[1]);
798 });
799 }
800
801 #[test_traced]
805 fn test_duplicate_fetch_request() {
806 let executor = deterministic::Runner::timed(Duration::from_secs(10));
807 executor.start(|context| async move {
808 let (mut oracle, mut schemes, peers, mut connections) =
809 setup_network_and_peers(&context, &[1, 2]).await;
810
811 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
812
813 let key = Key(5);
814 let mut prod2 = Producer::default();
815 prod2.insert(key.clone(), Bytes::from("data for key 5"));
816
817 let (cons1, mut cons_out1) = Consumer::new();
818
819 let scheme = schemes.remove(0);
820 let mut mailbox1 = setup_and_spawn_actor(
821 &context,
822 oracle.manager(),
823 oracle.control(scheme.public_key()),
824 scheme,
825 connections.remove(0),
826 cons1,
827 Producer::default(),
828 );
829
830 let scheme = schemes.remove(0);
831 let _mailbox2 = setup_and_spawn_actor(
832 &context,
833 oracle.manager(),
834 oracle.control(scheme.public_key()),
835 scheme,
836 connections.remove(0),
837 Consumer::dummy(),
838 prod2,
839 );
840
841 mailbox1.fetch(key.clone()).await;
843 mailbox1.fetch(key.clone()).await;
844
845 let event = cons_out1.recv().await.unwrap();
847 match event {
848 Event::Success(key_actual, value) => {
849 assert_eq!(key_actual, key);
850 assert_eq!(value, Bytes::from("data for key 5"));
851 }
852 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
853 }
854
855 select! {
857 _ = cons_out1.recv() => {
858 panic!("Unexpected second event received for duplicate fetch");
859 },
860 _ = context.sleep(Duration::from_millis(500)) => {
861 },
863 };
864 });
865 }
866
867 #[test_traced]
871 fn test_changing_peer_sets() {
872 let executor = deterministic::Runner::timed(Duration::from_secs(10));
873 executor.start(|context| async move {
874 let (mut oracle, mut schemes, peers, mut connections) =
875 setup_network_and_peers(&context, &[1, 2, 3]).await;
876
877 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
878 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
879
880 let key1 = Key(1);
881 let key2 = Key(2);
882
883 let mut prod2 = Producer::default();
884 prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
885
886 let mut prod3 = Producer::default();
887 prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
888
889 let (cons1, mut cons_out1) = Consumer::new();
890
891 let scheme = schemes.remove(0);
892 let mut mailbox1 = setup_and_spawn_actor(
893 &context,
894 oracle.manager(),
895 oracle.control(scheme.public_key()),
896 scheme,
897 connections.remove(0),
898 cons1,
899 Producer::default(),
900 );
901
902 let scheme = schemes.remove(0);
903 let _mailbox2 = setup_and_spawn_actor(
904 &context,
905 oracle.manager(),
906 oracle.control(scheme.public_key()),
907 scheme,
908 connections.remove(0),
909 Consumer::dummy(),
910 prod2,
911 );
912
913 mailbox1.fetch(key1.clone()).await;
915
916 let event = cons_out1.recv().await.unwrap();
918 match event {
919 Event::Success(key_actual, value) => {
920 assert_eq!(key_actual, key1);
921 assert_eq!(value, Bytes::from("data from peer 2"));
922 }
923 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
924 }
925
926 let scheme = schemes.remove(0);
928 let _mailbox3 = setup_and_spawn_actor(
929 &context,
930 oracle.manager(),
931 oracle.control(scheme.public_key()),
932 scheme,
933 connections.remove(0),
934 Consumer::dummy(),
935 prod3,
936 );
937
938 context.sleep(Duration::from_millis(200)).await;
940
941 mailbox1.fetch(key2.clone()).await;
943
944 let event = cons_out1.recv().await.unwrap();
946 match event {
947 Event::Success(key_actual, value) => {
948 assert_eq!(key_actual, key2);
949 assert_eq!(value, Bytes::from("data from peer 3"));
950 }
951 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
952 }
953 });
954 }
955
956 #[test_traced]
957 fn test_fetch_targeted() {
958 let executor = deterministic::Runner::timed(Duration::from_secs(10));
959 executor.start(|context| async move {
960 let (mut oracle, mut schemes, peers, mut connections) =
961 setup_network_and_peers(&context, &[1, 2, 3]).await;
962
963 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
964 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
965
966 let key = Key(1);
967 let invalid_data = Bytes::from("invalid data");
968 let valid_data = Bytes::from("valid data");
969
970 let mut prod2 = Producer::default();
972 prod2.insert(key.clone(), invalid_data.clone());
973
974 let mut prod3 = Producer::default();
975 prod3.insert(key.clone(), valid_data.clone());
976
977 let (mut cons1, mut cons_out1) = Consumer::new();
979 cons1.add_expected(key.clone(), valid_data.clone());
980
981 let scheme = schemes.remove(0);
982 let mut mailbox1 = setup_and_spawn_actor(
983 &context,
984 oracle.manager(),
985 oracle.control(scheme.public_key()),
986 scheme,
987 connections.remove(0),
988 cons1,
989 Producer::default(),
990 );
991
992 let scheme = schemes.remove(0);
993 let _mailbox2 = setup_and_spawn_actor(
994 &context,
995 oracle.manager(),
996 oracle.control(scheme.public_key()),
997 scheme,
998 connections.remove(0),
999 Consumer::dummy(),
1000 prod2,
1001 );
1002
1003 let scheme = schemes.remove(0);
1004 let _mailbox3 = setup_and_spawn_actor(
1005 &context,
1006 oracle.manager(),
1007 oracle.control(scheme.public_key()),
1008 scheme,
1009 connections.remove(0),
1010 Consumer::dummy(),
1011 prod3,
1012 );
1013
1014 context.sleep(Duration::from_millis(100)).await;
1016
1017 mailbox1
1021 .fetch_targeted(
1022 key.clone(),
1023 non_empty_vec![peers[1].clone(), peers[2].clone()],
1024 )
1025 .await;
1026
1027 let event = cons_out1.recv().await.unwrap();
1029 match event {
1030 Event::Success(key_actual, value) => {
1031 assert_eq!(key_actual, key);
1032 assert_eq!(value, valid_data);
1033 }
1034 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1035 }
1036
1037 let blocked = oracle.blocked().await.unwrap();
1039 assert_eq!(blocked.len(), 1);
1040 assert_eq!(blocked[0].0, peers[0]);
1041 assert_eq!(blocked[0].1, peers[1]);
1042
1043 let metrics = context.encode();
1045 assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
1046 });
1047 }
1048
1049 #[test_traced]
1050 fn test_fetch_targeted_no_fallback() {
1051 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1052 executor.start(|context| async move {
1053 let (mut oracle, mut schemes, peers, mut connections) =
1054 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1055
1056 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1057 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1058 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1059
1060 let key = Key(1);
1061
1062 let mut prod4 = Producer::default();
1064 prod4.insert(key.clone(), Bytes::from("data from peer 4"));
1065
1066 let (cons1, mut cons_out1) = Consumer::new();
1067
1068 let scheme = schemes.remove(0);
1069 let mut mailbox1 = setup_and_spawn_actor(
1070 &context,
1071 oracle.manager(),
1072 oracle.control(scheme.public_key()),
1073 scheme,
1074 connections.remove(0),
1075 cons1,
1076 Producer::default(),
1077 );
1078
1079 let scheme = schemes.remove(0);
1080 let _mailbox2 = setup_and_spawn_actor(
1081 &context,
1082 oracle.manager(),
1083 oracle.control(scheme.public_key()),
1084 scheme,
1085 connections.remove(0),
1086 Consumer::dummy(),
1087 Producer::default(), );
1089
1090 let scheme = schemes.remove(0);
1091 let _mailbox3 = setup_and_spawn_actor(
1092 &context,
1093 oracle.manager(),
1094 oracle.control(scheme.public_key()),
1095 scheme,
1096 connections.remove(0),
1097 Consumer::dummy(),
1098 Producer::default(), );
1100
1101 let scheme = schemes.remove(0);
1102 let _mailbox4 = setup_and_spawn_actor(
1103 &context,
1104 oracle.manager(),
1105 oracle.control(scheme.public_key()),
1106 scheme,
1107 connections.remove(0),
1108 Consumer::dummy(),
1109 prod4,
1110 );
1111
1112 context.sleep(Duration::from_millis(100)).await;
1114
1115 mailbox1
1118 .fetch_targeted(
1119 key.clone(),
1120 non_empty_vec![peers[1].clone(), peers[2].clone()],
1121 )
1122 .await;
1123
1124 select! {
1127 event = cons_out1.recv() => {
1128 panic!("Fetch should not succeed, but got: {event:?}");
1129 },
1130 _ = context.sleep(Duration::from_secs(3)) => {
1131 },
1133 };
1134 });
1135 }
1136
1137 #[test_traced]
1138 fn test_fetch_all_targeted() {
1139 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1140 executor.start(|context| async move {
1141 let (mut oracle, mut schemes, peers, mut connections) =
1142 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1143
1144 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1145 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1146 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1147
1148 let key1 = Key(1);
1149 let key2 = Key(2);
1150 let key3 = Key(3);
1151
1152 let mut prod2 = Producer::default();
1154 prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1155
1156 let mut prod3 = Producer::default();
1158 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1159
1160 let mut prod4 = Producer::default();
1162 prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1163
1164 let (mut cons1, mut cons_out1) = Consumer::new();
1166 cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1167 cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1168 cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1169
1170 let scheme = schemes.remove(0);
1171 let mut mailbox1 = setup_and_spawn_actor(
1172 &context,
1173 oracle.manager(),
1174 oracle.control(scheme.public_key()),
1175 scheme,
1176 connections.remove(0),
1177 cons1,
1178 Producer::default(),
1179 );
1180
1181 let scheme = schemes.remove(0);
1182 let _mailbox2 = setup_and_spawn_actor(
1183 &context,
1184 oracle.manager(),
1185 oracle.control(scheme.public_key()),
1186 scheme,
1187 connections.remove(0),
1188 Consumer::dummy(),
1189 prod2,
1190 );
1191
1192 let scheme = schemes.remove(0);
1193 let _mailbox3 = setup_and_spawn_actor(
1194 &context,
1195 oracle.manager(),
1196 oracle.control(scheme.public_key()),
1197 scheme,
1198 connections.remove(0),
1199 Consumer::dummy(),
1200 prod3,
1201 );
1202
1203 let scheme = schemes.remove(0);
1204 let _mailbox4 = setup_and_spawn_actor(
1205 &context,
1206 oracle.manager(),
1207 oracle.control(scheme.public_key()),
1208 scheme,
1209 connections.remove(0),
1210 Consumer::dummy(),
1211 prod4,
1212 );
1213
1214 context.sleep(Duration::from_millis(100)).await;
1216
1217 mailbox1
1222 .fetch_all_targeted(vec![
1223 (key1.clone(), non_empty_vec![peers[1].clone()]), (key2.clone(), non_empty_vec![peers[3].clone()]), ])
1226 .await;
1227 mailbox1.fetch(key3.clone()).await; let mut results = HashMap::new();
1231 for _ in 0..3 {
1232 let event = cons_out1.recv().await.unwrap();
1233 match event {
1234 Event::Success(key, value) => {
1235 results.insert(key, value);
1236 }
1237 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1238 }
1239 }
1240
1241 assert_eq!(results.len(), 3);
1243 assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1244 assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1245 assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1246
1247 let metrics = context.encode();
1249 assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1250 });
1251 }
1252
1253 #[test_traced]
1256 fn test_fetch_clears_targets() {
1257 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1258 executor.start(|context| async move {
1259 let (mut oracle, mut schemes, peers, mut connections) =
1260 setup_network_and_peers(&context, &[1, 2, 3]).await;
1261
1262 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1263 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1264
1265 let key = Key(1);
1266 let valid_data = Bytes::from("valid data");
1267
1268 let mut prod3 = Producer::default();
1270 prod3.insert(key.clone(), valid_data.clone());
1271
1272 let (cons1, mut cons_out1) = Consumer::new();
1273
1274 let scheme = schemes.remove(0);
1275 let mut mailbox1 = setup_and_spawn_actor(
1276 &context,
1277 oracle.manager(),
1278 oracle.control(scheme.public_key()),
1279 scheme,
1280 connections.remove(0),
1281 cons1,
1282 Producer::default(),
1283 );
1284
1285 let scheme = schemes.remove(0);
1286 let _mailbox2 = setup_and_spawn_actor(
1287 &context,
1288 oracle.manager(),
1289 oracle.control(scheme.public_key()),
1290 scheme,
1291 connections.remove(0),
1292 Consumer::dummy(),
1293 Producer::default(), );
1295
1296 let scheme = schemes.remove(0);
1297 let _mailbox3 = setup_and_spawn_actor(
1298 &context,
1299 oracle.manager(),
1300 oracle.control(scheme.public_key()),
1301 scheme,
1302 connections.remove(0),
1303 Consumer::dummy(),
1304 prod3,
1305 );
1306
1307 context.sleep(Duration::from_millis(100)).await;
1309
1310 mailbox1
1312 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1313 .await;
1314
1315 context.sleep(Duration::from_millis(500)).await;
1317
1318 mailbox1.fetch(key.clone()).await;
1320
1321 let event = cons_out1.recv().await.unwrap();
1323 match event {
1324 Event::Success(key_actual, value) => {
1325 assert_eq!(key_actual, key);
1326 assert_eq!(value, valid_data);
1327 }
1328 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1329 }
1330 });
1331 }
1332
1333 #[test_traced]
1334 fn test_fetch_targeted_does_not_restrict_all() {
1335 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1336 executor.start(|context| async move {
1337 let (mut oracle, mut schemes, peers, mut connections) =
1338 setup_network_and_peers(&context, &[1, 2, 3]).await;
1339
1340 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1341 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1342
1343 let key = Key(1);
1344 let valid_data = Bytes::from("valid data");
1345
1346 let mut prod3 = Producer::default();
1348 prod3.insert(key.clone(), valid_data.clone());
1349
1350 let (cons1, mut cons_out1) = Consumer::new();
1351
1352 let scheme = schemes.remove(0);
1353 let mut mailbox1 = setup_and_spawn_actor(
1354 &context,
1355 oracle.manager(),
1356 oracle.control(scheme.public_key()),
1357 scheme,
1358 connections.remove(0),
1359 cons1,
1360 Producer::default(),
1361 );
1362
1363 let scheme = schemes.remove(0);
1364 let _mailbox2 = setup_and_spawn_actor(
1365 &context,
1366 oracle.manager(),
1367 oracle.control(scheme.public_key()),
1368 scheme,
1369 connections.remove(0),
1370 Consumer::dummy(),
1371 Producer::default(), );
1373
1374 let scheme = schemes.remove(0);
1375 let _mailbox3 = setup_and_spawn_actor(
1376 &context,
1377 oracle.manager(),
1378 oracle.control(scheme.public_key()),
1379 scheme,
1380 connections.remove(0),
1381 Consumer::dummy(),
1382 prod3,
1383 );
1384
1385 context.sleep(Duration::from_millis(100)).await;
1387
1388 mailbox1.fetch(key.clone()).await;
1390
1391 context.sleep(Duration::from_millis(50)).await;
1393
1394 mailbox1
1397 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1398 .await;
1399
1400 let event = cons_out1.recv().await.unwrap();
1403 match event {
1404 Event::Success(key_actual, value) => {
1405 assert_eq!(key_actual, key);
1406 assert_eq!(value, valid_data);
1407 }
1408 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1409 }
1410 });
1411 }
1412
1413 #[test_traced]
1414 fn test_retain() {
1415 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1416 executor.start(|context| async move {
1417 let (mut oracle, mut schemes, peers, mut connections) =
1418 setup_network_and_peers(&context, &[1, 2]).await;
1419
1420 let key = Key(5);
1421 let mut prod2 = Producer::default();
1422 prod2.insert(key.clone(), Bytes::from("data for key 5"));
1423
1424 let (cons1, mut cons_out1) = Consumer::new();
1425
1426 let scheme = schemes.remove(0);
1427 let mut mailbox1 = setup_and_spawn_actor(
1428 &context,
1429 oracle.manager(),
1430 oracle.control(scheme.public_key()),
1431 scheme,
1432 connections.remove(0),
1433 cons1,
1434 Producer::default(),
1435 );
1436
1437 let scheme = schemes.remove(0);
1438 let _mailbox2 = setup_and_spawn_actor(
1439 &context,
1440 oracle.manager(),
1441 oracle.control(scheme.public_key()),
1442 scheme,
1443 connections.remove(0),
1444 Consumer::dummy(),
1445 prod2,
1446 );
1447
1448 mailbox1.retain(|_| true).await;
1450 select! {
1451 _ = cons_out1.recv() => {
1452 panic!("unexpected event");
1453 },
1454 _ = context.sleep(Duration::from_millis(100)) => {},
1455 };
1456
1457 mailbox1.fetch(key.clone()).await;
1459
1460 let key_clone = key.clone();
1463 mailbox1.retain(move |k| k != &key_clone).await;
1464
1465 let event = cons_out1.recv().await.unwrap();
1467 match event {
1468 Event::Failed(key_actual) => {
1469 assert_eq!(key_actual, key);
1470 }
1471 Event::Success(_, _) => panic!("Fetch should have been retained out"),
1472 }
1473
1474 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1476
1477 mailbox1.fetch(key.clone()).await;
1480
1481 let event = cons_out1.recv().await.unwrap();
1483 match event {
1484 Event::Success(key_actual, value) => {
1485 assert_eq!(key_actual, key);
1486 assert_eq!(value, Bytes::from("data for key 5"));
1487 }
1488 Event::Failed(_) => unreachable!(),
1489 }
1490 });
1491 }
1492
1493 #[test_traced]
1494 fn test_clear() {
1495 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1496 executor.start(|context| async move {
1497 let (mut oracle, mut schemes, peers, mut connections) =
1498 setup_network_and_peers(&context, &[1, 2]).await;
1499
1500 let key = Key(6);
1502 let mut prod2 = Producer::default();
1503 prod2.insert(key.clone(), Bytes::from("data for key 6"));
1504
1505 let (cons1, mut cons_out1) = Consumer::new();
1506
1507 let scheme = schemes.remove(0);
1508 let mut mailbox1 = setup_and_spawn_actor(
1509 &context,
1510 oracle.manager(),
1511 oracle.control(scheme.public_key()),
1512 scheme,
1513 connections.remove(0),
1514 cons1,
1515 Producer::default(),
1516 );
1517
1518 let scheme = schemes.remove(0);
1519 let _mailbox2 = setup_and_spawn_actor(
1520 &context,
1521 oracle.manager(),
1522 oracle.control(scheme.public_key()),
1523 scheme,
1524 connections.remove(0),
1525 Consumer::dummy(),
1526 prod2,
1527 );
1528
1529 mailbox1.clear().await;
1531 select! {
1532 _ = cons_out1.recv() => {
1533 panic!("unexpected event");
1534 },
1535 _ = context.sleep(Duration::from_millis(100)) => {},
1536 };
1537
1538 mailbox1.fetch(key.clone()).await;
1540
1541 mailbox1.clear().await;
1543
1544 let event = cons_out1.recv().await.unwrap();
1546 match event {
1547 Event::Failed(key_actual) => {
1548 assert_eq!(key_actual, key);
1549 }
1550 Event::Success(_, _) => panic!("Fetch should have been cleared"),
1551 }
1552
1553 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1555
1556 mailbox1.fetch(key.clone()).await;
1559
1560 let event = cons_out1.recv().await.unwrap();
1562 match event {
1563 Event::Success(key_actual, value) => {
1564 assert_eq!(key_actual, key);
1565 assert_eq!(value, Bytes::from("data for key 6"));
1566 }
1567 Event::Failed(_) => unreachable!(),
1568 }
1569 });
1570 }
1571
1572 #[test_traced]
1576 fn test_rate_limit_spillover() {
1577 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1578 executor.start(|context| async move {
1579 let (mut oracle, mut schemes, peers, mut connections) =
1581 setup_network_and_peers_with_rate_limit(
1582 &context,
1583 &[1, 2, 3],
1584 Quota::per_second(NZU32!(1)),
1585 )
1586 .await;
1587
1588 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1590 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1591
1592 let mut prod2 = Producer::default();
1594 let mut prod3 = Producer::default();
1595 prod2.insert(Key(0), Bytes::from("data for key 0"));
1596 prod2.insert(Key(1), Bytes::from("data for key 1"));
1597 prod3.insert(Key(0), Bytes::from("data for key 0"));
1598 prod3.insert(Key(1), Bytes::from("data for key 1"));
1599
1600 let (cons1, mut cons_out1) = Consumer::new();
1601
1602 let scheme = schemes.remove(0);
1604 let mut mailbox1 = setup_and_spawn_actor(
1605 &context,
1606 oracle.manager(),
1607 oracle.control(scheme.public_key()),
1608 scheme,
1609 connections.remove(0),
1610 cons1,
1611 Producer::default(),
1612 );
1613
1614 let scheme = schemes.remove(0);
1616 let _mailbox2 = setup_and_spawn_actor(
1617 &context,
1618 oracle.manager(),
1619 oracle.control(scheme.public_key()),
1620 scheme,
1621 connections.remove(0),
1622 Consumer::dummy(),
1623 prod2,
1624 );
1625
1626 let scheme = schemes.remove(0);
1628 let _mailbox3 = setup_and_spawn_actor(
1629 &context,
1630 oracle.manager(),
1631 oracle.control(scheme.public_key()),
1632 scheme,
1633 connections.remove(0),
1634 Consumer::dummy(),
1635 prod3,
1636 );
1637
1638 context.sleep(Duration::from_millis(100)).await;
1640 let start = context.current();
1641
1642 mailbox1.fetch(Key(0)).await;
1646 mailbox1.fetch(Key(1)).await;
1647
1648 let mut results = HashMap::new();
1650 for _ in 0..2 {
1651 let event = cons_out1.recv().await.unwrap();
1652 match event {
1653 Event::Success(key, value) => {
1654 results.insert(key.clone(), value);
1655 }
1656 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1657 }
1658 }
1659
1660 assert_eq!(results.len(), 2);
1662 assert_eq!(
1663 results.get(&Key(0)).unwrap(),
1664 &Bytes::from("data for key 0")
1665 );
1666 assert_eq!(
1667 results.get(&Key(1)).unwrap(),
1668 &Bytes::from("data for key 1")
1669 );
1670
1671 let elapsed = context.current().duration_since(start).unwrap();
1674 assert!(
1675 elapsed < Duration::from_millis(500),
1676 "Expected quick completion via spill-over, but took {elapsed:?}"
1677 );
1678 });
1679 }
1680
1681 #[test_traced]
1685 fn test_rate_limit_retry_after_reset() {
1686 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1687 executor.start(|context| async move {
1688 let (mut oracle, mut schemes, peers, mut connections) =
1690 setup_network_and_peers_with_rate_limit(
1691 &context,
1692 &[1, 2],
1693 Quota::per_second(NZU32!(1)),
1694 )
1695 .await;
1696
1697 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1698
1699 let mut prod2 = Producer::default();
1701 prod2.insert(Key(1), Bytes::from("data for key 1"));
1702 prod2.insert(Key(2), Bytes::from("data for key 2"));
1703 prod2.insert(Key(3), Bytes::from("data for key 3"));
1704
1705 let (cons1, mut cons_out1) = Consumer::new();
1706
1707 let scheme = schemes.remove(0);
1708 let mut mailbox1 = setup_and_spawn_actor(
1709 &context,
1710 oracle.manager(),
1711 oracle.control(scheme.public_key()),
1712 scheme,
1713 connections.remove(0),
1714 cons1,
1715 Producer::default(),
1716 );
1717
1718 let scheme = schemes.remove(0);
1719 let _mailbox2 = setup_and_spawn_actor(
1720 &context,
1721 oracle.manager(),
1722 oracle.control(scheme.public_key()),
1723 scheme,
1724 connections.remove(0),
1725 Consumer::dummy(),
1726 prod2,
1727 );
1728
1729 context.sleep(Duration::from_millis(100)).await;
1731 let start = context.current();
1732
1733 mailbox1.fetch(Key(1)).await;
1736 mailbox1.fetch(Key(2)).await;
1737 mailbox1.fetch(Key(3)).await;
1738
1739 let mut results = HashMap::new();
1741 for _ in 0..3 {
1742 let event = cons_out1.recv().await.unwrap();
1743 match event {
1744 Event::Success(key, value) => {
1745 results.insert(key.clone(), value);
1746 }
1747 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1748 }
1749 }
1750
1751 assert_eq!(results.len(), 3);
1752 for i in 1..=3 {
1753 assert_eq!(
1754 results.get(&Key(i)).unwrap(),
1755 &Bytes::from(format!("data for key {}", i))
1756 );
1757 }
1758
1759 let elapsed = context.current().duration_since(start).unwrap();
1763 assert!(
1764 elapsed > Duration::from_secs(2),
1765 "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1766 );
1767 });
1768 }
1769
1770 #[test_traced]
1774 fn test_self_exclusion() {
1775 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1776 executor.start(|context| async move {
1777 let (mut oracle, mut schemes, peers, mut connections) =
1778 setup_network_and_peers(&context, &[1, 2]).await;
1779
1780 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1781
1782 let key = Key(1);
1783 let data = Bytes::from("shared data");
1784
1785 let mut prod1 = Producer::default();
1787 prod1.insert(key.clone(), data.clone());
1788 let mut prod2 = Producer::default();
1789 prod2.insert(key.clone(), data.clone());
1790
1791 let (cons1, mut cons_out1) = Consumer::new();
1792
1793 let scheme = schemes.remove(0);
1795 let mut mailbox1 = setup_and_spawn_actor(
1796 &context,
1797 oracle.manager(),
1798 oracle.control(scheme.public_key()),
1799 scheme,
1800 connections.remove(0),
1801 cons1,
1802 prod1, );
1804
1805 let scheme = schemes.remove(0);
1807 let _mailbox2 = setup_and_spawn_actor(
1808 &context,
1809 oracle.manager(),
1810 oracle.control(scheme.public_key()),
1811 scheme,
1812 connections.remove(0),
1813 Consumer::dummy(),
1814 prod2,
1815 );
1816
1817 context.sleep(Duration::from_millis(100)).await;
1819
1820 mailbox1.fetch(key.clone()).await;
1822
1823 let event = cons_out1.recv().await.unwrap();
1825 match event {
1826 Event::Success(key_actual, value) => {
1827 assert_eq!(key_actual, key);
1828 assert_eq!(value, data);
1829 }
1830 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1831 }
1832 });
1833 }
1834
1835 #[test_traced]
1836 fn test_fetch_uses_primary_peers_only() {
1837 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1838 executor.start(|context| async move {
1839 let (network, oracle) = Network::new(
1840 context.with_label("network"),
1841 commonware_p2p::simulated::Config {
1842 max_size: 1024 * 1024,
1843 disconnect_on_block: true,
1844 tracked_peer_sets: NZUsize!(1),
1845 },
1846 );
1847 network.start();
1848
1849 let schemes: Vec<PrivateKey> = [1u64, 2, 3]
1850 .into_iter()
1851 .map(PrivateKey::from_seed)
1852 .collect();
1853 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
1854 let mut schemes = schemes;
1855
1856 let mut connections = Vec::new();
1857 for peer in &peers {
1858 let (sender, receiver) = oracle
1859 .control(peer.clone())
1860 .register(0, Quota::per_second(RATE_LIMIT))
1861 .await
1862 .unwrap();
1863 connections.push((sender, receiver));
1864 }
1865
1866 let mut oracle = oracle;
1870 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1871 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1872
1873 oracle
1874 .manager()
1875 .track(
1876 1,
1877 TrackedPeers::new(
1878 Set::try_from([peers[1].clone()]).unwrap(),
1879 Set::try_from([peers[2].clone()]).unwrap(),
1880 ),
1881 )
1882 .await;
1883 context.sleep(Duration::from_millis(100)).await;
1884
1885 let key = Key(1);
1886 let data = Bytes::from("secondary only data");
1887
1888 let (cons1, mut cons_out1) = Consumer::new();
1889
1890 let scheme = schemes.remove(0);
1892 let mut mailbox1 = setup_and_spawn_actor(
1893 &context,
1894 oracle.manager(),
1895 oracle.control(scheme.public_key()),
1896 scheme,
1897 connections.remove(0),
1898 cons1,
1899 Producer::default(),
1900 );
1901
1902 let scheme = schemes.remove(0);
1904 let _mailbox2 = setup_and_spawn_actor(
1905 &context,
1906 oracle.manager(),
1907 oracle.control(scheme.public_key()),
1908 scheme,
1909 connections.remove(0),
1910 Consumer::dummy(),
1911 Producer::default(),
1912 );
1913
1914 let mut prod3 = Producer::default();
1916 prod3.insert(key.clone(), data);
1917 let scheme = schemes.remove(0);
1918 let _mailbox3 = setup_and_spawn_actor(
1919 &context,
1920 oracle.manager(),
1921 oracle.control(scheme.public_key()),
1922 scheme,
1923 connections.remove(0),
1924 Consumer::dummy(),
1925 prod3,
1926 );
1927
1928 mailbox1.fetch(key.clone()).await;
1931
1932 select! {
1933 event = cons_out1.recv() => {
1934 panic!("fetch should not succeed from a secondary peer, got: {event:?}");
1935 },
1936 _ = context.sleep(Duration::from_secs(2)) => {},
1937 }
1938 });
1939 }
1940
1941 #[test_traced]
1942 fn test_fetch_uses_latest_primary_set_only() {
1943 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1944 executor.start(|context| async move {
1945 let (network, oracle) = Network::new(
1946 context.with_label("network"),
1947 commonware_p2p::simulated::Config {
1948 max_size: 1024 * 1024,
1949 disconnect_on_block: true,
1950 tracked_peer_sets: NZUsize!(2),
1951 },
1952 );
1953 network.start();
1954
1955 let schemes: Vec<PrivateKey> = [1u64, 2, 3]
1956 .into_iter()
1957 .map(PrivateKey::from_seed)
1958 .collect();
1959 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
1960 let mut schemes = schemes;
1961
1962 let mut connections = Vec::new();
1963 for peer in &peers {
1964 let (sender, receiver) = oracle
1965 .control(peer.clone())
1966 .register(0, Quota::per_second(RATE_LIMIT))
1967 .await
1968 .unwrap();
1969 connections.push((sender, receiver));
1970 }
1971
1972 let mut oracle = oracle;
1973 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1974 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1975
1976 oracle
1980 .manager()
1981 .track(
1982 0,
1983 Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
1984 )
1985 .await;
1986 context.sleep(Duration::from_millis(100)).await;
1987
1988 let key = Key(7);
1989 let targeted_key = Key(8);
1990 let data = Bytes::from("old primary data");
1991
1992 let (cons1, mut cons_out1) = Consumer::new();
1993
1994 let scheme = schemes.remove(0);
1996 let mut mailbox1 = setup_and_spawn_actor(
1997 &context,
1998 oracle.manager(),
1999 oracle.control(scheme.public_key()),
2000 scheme,
2001 connections.remove(0),
2002 cons1,
2003 Producer::default(),
2004 );
2005
2006 let mut prod2 = Producer::default();
2008 prod2.insert(key.clone(), data.clone());
2009 prod2.insert(targeted_key.clone(), data);
2010 let scheme = schemes.remove(0);
2011 let _mailbox2 = setup_and_spawn_actor(
2012 &context,
2013 oracle.manager(),
2014 oracle.control(scheme.public_key()),
2015 scheme,
2016 connections.remove(0),
2017 Consumer::dummy(),
2018 prod2,
2019 );
2020
2021 let scheme = schemes.remove(0);
2023 let _mailbox3 = setup_and_spawn_actor(
2024 &context,
2025 oracle.manager(),
2026 oracle.control(scheme.public_key()),
2027 scheme,
2028 connections.remove(0),
2029 Consumer::dummy(),
2030 Producer::default(),
2031 );
2032
2033 context.sleep(Duration::from_millis(100)).await;
2034
2035 oracle
2039 .manager()
2040 .track(
2041 1,
2042 Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
2043 )
2044 .await;
2045 context.sleep(Duration::from_millis(100)).await;
2046
2047 mailbox1.fetch(key).await;
2048
2049 select! {
2050 event = cons_out1.recv() => {
2051 panic!(
2052 "fetch should not succeed from an old primary retained only in the overlap window, got: {event:?}"
2053 );
2054 },
2055 _ = context.sleep(Duration::from_secs(1)) => {},
2056 }
2057
2058 mailbox1
2060 .fetch_targeted(targeted_key, non_empty_vec![peers[1].clone()])
2061 .await;
2062
2063 select! {
2064 event = cons_out1.recv() => {
2065 panic!(
2066 "targeted fetch should not bypass the latest-primary filter, got: {event:?}"
2067 );
2068 },
2069 _ = context.sleep(Duration::from_secs(1)) => {},
2070 }
2071 });
2072 }
2073
2074 #[test_traced]
2075 fn test_fetch_after_cutover_relies_on_latest_primary_history() {
2076 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2077 executor.start(|context| async move {
2078 let (network, oracle) = Network::new(
2079 context.with_label("network"),
2080 commonware_p2p::simulated::Config {
2081 max_size: 1024 * 1024,
2082 disconnect_on_block: true,
2083 tracked_peer_sets: NZUsize!(2),
2084 },
2085 );
2086 network.start();
2087
2088 let schemes: Vec<PrivateKey> = [1u64, 2, 3]
2089 .into_iter()
2090 .map(PrivateKey::from_seed)
2091 .collect();
2092 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
2093 let mut schemes = schemes;
2094
2095 let mut connections = Vec::new();
2096 for peer in &peers {
2097 let (sender, receiver) = oracle
2098 .control(peer.clone())
2099 .register(0, Quota::per_second(RATE_LIMIT))
2100 .await
2101 .unwrap();
2102 connections.push((sender, receiver));
2103 }
2104
2105 let mut oracle = oracle;
2106 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2107 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2108
2109 oracle
2112 .manager()
2113 .track(
2114 0,
2115 Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
2116 )
2117 .await;
2118 context.sleep(Duration::from_millis(100)).await;
2119
2120 let key = Key(9);
2121 let invalid_history = Bytes::from("stale overlap history");
2122 let valid_history = Bytes::from("latest primary history");
2123
2124 let (mut cons1, mut cons_out1) = Consumer::new();
2125 cons1.add_expected(key.clone(), valid_history.clone());
2126
2127 let scheme = schemes.remove(0);
2129 let mut mailbox1 = setup_and_spawn_actor(
2130 &context,
2131 oracle.manager(),
2132 oracle.control(scheme.public_key()),
2133 scheme,
2134 connections.remove(0),
2135 cons1,
2136 Producer::default(),
2137 );
2138
2139 let mut prod2 = Producer::default();
2142 prod2.insert(key.clone(), invalid_history);
2143 let scheme = schemes.remove(0);
2144 let _mailbox2 = setup_and_spawn_actor(
2145 &context,
2146 oracle.manager(),
2147 oracle.control(scheme.public_key()),
2148 scheme,
2149 connections.remove(0),
2150 Consumer::dummy(),
2151 prod2,
2152 );
2153
2154 let mut prod3 = Producer::default();
2156 prod3.insert(key.clone(), valid_history.clone());
2157 let scheme = schemes.remove(0);
2158 let _mailbox3 = setup_and_spawn_actor(
2159 &context,
2160 oracle.manager(),
2161 oracle.control(scheme.public_key()),
2162 scheme,
2163 connections.remove(0),
2164 Consumer::dummy(),
2165 prod3,
2166 );
2167
2168 context.sleep(Duration::from_millis(100)).await;
2169
2170 oracle
2171 .manager()
2172 .track(
2173 1,
2174 Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
2175 )
2176 .await;
2177 context.sleep(Duration::from_millis(100)).await;
2178
2179 mailbox1.fetch(key.clone()).await;
2180
2181 let event = cons_out1.recv().await.unwrap();
2182 match event {
2183 Event::Success(key_actual, value) => {
2184 assert_eq!(key_actual, key);
2185 assert_eq!(value, valid_history);
2186 }
2187 Event::Failed(_) => panic!("fetch failed unexpectedly"),
2188 }
2189
2190 assert!(
2191 oracle.blocked().await.unwrap().is_empty(),
2192 "overlap-only peers should not be queried for post-cutover history"
2193 );
2194 });
2195 }
2196
2197 #[test_traced]
2198 fn test_secondary_peer_requests_are_served() {
2199 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2200 executor.start(|context| async move {
2201 let (mut oracle, mut schemes, peers, mut connections) =
2202 setup_network_and_peers(&context, &[1, 2]).await;
2203
2204 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2209
2210 oracle
2211 .manager()
2212 .track(
2213 1,
2214 TrackedPeers::new(
2215 Set::try_from([peers[0].clone()]).unwrap(),
2216 Set::try_from([peers[1].clone()]).unwrap(),
2217 ),
2218 )
2219 .await;
2220 context.sleep(Duration::from_millis(100)).await;
2221
2222 let key = Key(9);
2223 let data = Bytes::from("served to secondary");
2224
2225 let mut prod1 = Producer::default();
2227 prod1.insert(key.clone(), data.clone());
2228
2229 let scheme = schemes.remove(0);
2230 let _mailbox1 = setup_and_spawn_actor(
2231 &context,
2232 oracle.manager(),
2233 oracle.control(scheme.public_key()),
2234 scheme,
2235 connections.remove(0),
2236 Consumer::dummy(),
2237 prod1,
2238 );
2239
2240 let (mut cons2, mut cons_out2) = Consumer::new();
2242 cons2.add_expected(key.clone(), data.clone());
2243 let scheme = schemes.remove(0);
2244 let mut mailbox2 = setup_and_spawn_actor(
2245 &context,
2246 oracle.manager(),
2247 oracle.control(scheme.public_key()),
2248 scheme,
2249 connections.remove(0),
2250 cons2,
2251 Producer::default(),
2252 );
2253
2254 mailbox2
2255 .fetch_targeted(key.clone(), non_empty_vec![peers[0].clone()])
2256 .await;
2257
2258 let event = cons_out2.recv().await.unwrap();
2259 match event {
2260 Event::Success(key_actual, value) => {
2261 assert_eq!(key_actual, key);
2262 assert_eq!(value, data);
2263 }
2264 Event::Failed(_) => panic!("secondary peer request should have been served"),
2265 }
2266 });
2267 }
2268
2269 #[allow(clippy::type_complexity)]
2270 fn spawn_actors_with_handles(
2271 context: deterministic::Context,
2272 oracle: &Oracle<PublicKey, deterministic::Context>,
2273 schemes: Vec<PrivateKey>,
2274 connections: Vec<(
2275 Sender<PublicKey, deterministic::Context>,
2276 Receiver<PublicKey>,
2277 )>,
2278 consumers: Vec<Consumer<Key, Bytes>>,
2279 producers: Vec<Producer<Key, Bytes>>,
2280 ) -> (
2281 Vec<Mailbox<Key, PublicKey>>,
2282 Vec<commonware_runtime::Handle<()>>,
2283 ) {
2284 let actor_context = context.with_label("actor");
2285 let mut mailboxes = Vec::new();
2286 let mut handles = Vec::new();
2287
2288 for (idx, ((scheme, conn), (consumer, producer))) in schemes
2289 .into_iter()
2290 .zip(connections)
2291 .zip(consumers.into_iter().zip(producers))
2292 .enumerate()
2293 {
2294 let ctx = actor_context.with_label(&format!("peer_{idx}"));
2295 let public_key = scheme.public_key();
2296 let (engine, mailbox) = Engine::new(
2297 ctx,
2298 Config {
2299 peer_provider: oracle.manager(),
2300 blocker: oracle.control(public_key.clone()),
2301 consumer,
2302 producer,
2303 mailbox_size: MAILBOX_SIZE,
2304 me: Some(public_key),
2305 initial: INITIAL_DURATION,
2306 timeout: TIMEOUT,
2307 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
2308 priority_requests: false,
2309 priority_responses: false,
2310 },
2311 );
2312 handles.push(engine.start(conn));
2313 mailboxes.push(mailbox);
2314 }
2315
2316 (mailboxes, handles)
2317 }
2318
2319 #[test_traced]
2320 fn test_operations_after_shutdown_do_not_panic() {
2321 let executor = deterministic::Runner::timed(Duration::from_secs(10));
2322 executor.start(|context| async move {
2323 let (mut oracle, schemes, peers, connections) =
2324 setup_network_and_peers(&context, &[1, 2]).await;
2325
2326 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2327
2328 let key = Key(1);
2329 let mut prod2 = Producer::default();
2330 prod2.insert(key.clone(), Bytes::from("data for key 1"));
2331
2332 let (cons1, mut cons_out1) = Consumer::new();
2333
2334 let (mut mailboxes, handles) = spawn_actors_with_handles(
2335 context.clone(),
2336 &oracle,
2337 schemes,
2338 connections,
2339 vec![cons1, Consumer::dummy()],
2340 vec![Producer::default(), prod2],
2341 );
2342
2343 mailboxes[0].fetch(key.clone()).await;
2345 let event = cons_out1.recv().await.unwrap();
2346 match event {
2347 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
2348 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
2349 }
2350
2351 for handle in handles {
2353 handle.abort();
2354 }
2355 context.sleep(Duration::from_millis(100)).await;
2356
2357 let key2 = Key(2);
2361 mailboxes[0].fetch(key2.clone()).await;
2362
2363 mailboxes[0].cancel(key2.clone()).await;
2365
2366 mailboxes[0].clear().await;
2368
2369 mailboxes[0].retain(|_| true).await;
2371
2372 mailboxes[0]
2374 .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
2375 .await;
2376 });
2377 }
2378
2379 fn clean_shutdown(seed: u64) {
2380 let cfg = deterministic::Config::default()
2381 .with_seed(seed)
2382 .with_timeout(Some(Duration::from_secs(30)));
2383 let executor = deterministic::Runner::new(cfg);
2384 executor.start(|context| async move {
2385 let (mut oracle, schemes, peers, connections) =
2386 setup_network_and_peers(&context, &[1, 2]).await;
2387
2388 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2389
2390 let key = Key(1);
2391 let mut prod2 = Producer::default();
2392 prod2.insert(key.clone(), Bytes::from("data for key 1"));
2393
2394 let (cons1, mut cons_out1) = Consumer::new();
2395
2396 let (mut mailboxes, handles) = spawn_actors_with_handles(
2397 context.clone(),
2398 &oracle,
2399 schemes,
2400 connections,
2401 vec![cons1, Consumer::dummy()],
2402 vec![Producer::default(), prod2],
2403 );
2404
2405 context.sleep(Duration::from_millis(100)).await;
2407
2408 let running_before = count_running_tasks(&context, "actor");
2410 assert!(
2411 running_before > 0,
2412 "at least one actor task should be running"
2413 );
2414
2415 mailboxes[0].fetch(key.clone()).await;
2417 let event = cons_out1.recv().await.unwrap();
2418 match event {
2419 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
2420 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
2421 }
2422
2423 for handle in handles {
2425 handle.abort();
2426 }
2427 context.sleep(Duration::from_millis(100)).await;
2428
2429 let running_after = count_running_tasks(&context, "actor");
2431 assert_eq!(
2432 running_after, 0,
2433 "all actor tasks should be stopped, but {running_after} still running"
2434 );
2435 });
2436 }
2437
2438 #[test]
2439 fn test_clean_shutdown() {
2440 for seed in 0..25 {
2441 clean_shutdown(seed);
2442 }
2443 }
2444}