1use bytes::Bytes;
44use commonware_utils::{channel::oneshot, Span};
45use std::future::Future;
46
47mod config;
48pub use config::Config;
49mod engine;
50pub use engine::Engine;
51mod fetcher;
52mod ingress;
53pub use ingress::Mailbox;
54mod metrics;
55mod wire;
56
57#[cfg(feature = "mocks")]
58pub mod mocks;
59
60pub trait Producer: Clone + Send + 'static {
62 type Key: Span;
64
65 fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
67}
68
69#[cfg(test)]
70mod tests {
71 use super::{
72 mocks::{Consumer, Event, Key, Producer},
73 Config, Engine, Mailbox,
74 };
75 use crate::Resolver;
76 use bytes::Bytes;
77 use commonware_cryptography::{
78 ed25519::{PrivateKey, PublicKey},
79 Signer,
80 };
81 use commonware_macros::{select, test_traced};
82 use commonware_p2p::{
83 simulated::{Link, Network, Oracle, Receiver, Sender},
84 Blocker, Manager, Provider,
85 };
86 use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
87 use commonware_utils::{non_empty_vec, NZU32};
88 use std::{collections::HashMap, num::NonZeroU32, time::Duration};
89
90 const MAILBOX_SIZE: usize = 1024;
91 const RATE_LIMIT: NonZeroU32 = NZU32!(10);
92 const INITIAL_DURATION: Duration = Duration::from_millis(100);
93 const TIMEOUT: Duration = Duration::from_millis(400);
94 const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
95 const LINK: Link = Link {
96 latency: Duration::from_millis(10),
97 jitter: Duration::from_millis(1),
98 success_rate: 1.0,
99 };
100 const LINK_UNRELIABLE: Link = Link {
101 latency: Duration::from_millis(10),
102 jitter: Duration::from_millis(1),
103 success_rate: 0.5,
104 };
105
106 async fn setup_network_and_peers(
107 context: &deterministic::Context,
108 peer_seeds: &[u64],
109 ) -> (
110 Oracle<PublicKey, deterministic::Context>,
111 Vec<PrivateKey>,
112 Vec<PublicKey>,
113 Vec<(
114 Sender<PublicKey, deterministic::Context>,
115 Receiver<PublicKey>,
116 )>,
117 ) {
118 setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
119 .await
120 }
121
122 async fn setup_network_and_peers_with_rate_limit(
123 context: &deterministic::Context,
124 peer_seeds: &[u64],
125 rate_limit: Quota,
126 ) -> (
127 Oracle<PublicKey, deterministic::Context>,
128 Vec<PrivateKey>,
129 Vec<PublicKey>,
130 Vec<(
131 Sender<PublicKey, deterministic::Context>,
132 Receiver<PublicKey>,
133 )>,
134 ) {
135 let (network, oracle) = Network::new(
136 context.with_label("network"),
137 commonware_p2p::simulated::Config {
138 max_size: 1024 * 1024,
139 disconnect_on_block: true,
140 tracked_peer_sets: Some(3),
141 },
142 );
143 network.start();
144
145 let schemes: Vec<PrivateKey> = peer_seeds
146 .iter()
147 .map(|seed| PrivateKey::from_seed(*seed))
148 .collect();
149 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
150 let mut manager = oracle.manager();
151 manager.track(0, peers.clone().try_into().unwrap()).await;
152
153 let mut connections = Vec::new();
154 for peer in &peers {
155 let (sender, receiver) = oracle
156 .control(peer.clone())
157 .register(0, rate_limit)
158 .await
159 .unwrap();
160 connections.push((sender, receiver));
161 }
162
163 (oracle, schemes, peers, connections)
164 }
165
166 async fn add_link(
167 oracle: &mut Oracle<PublicKey, deterministic::Context>,
168 link: Link,
169 peers: &[PublicKey],
170 from: usize,
171 to: usize,
172 ) {
173 oracle
174 .add_link(peers[from].clone(), peers[to].clone(), link.clone())
175 .await
176 .unwrap();
177 oracle
178 .add_link(peers[to].clone(), peers[from].clone(), link)
179 .await
180 .unwrap();
181 }
182
183 async fn setup_and_spawn_actor(
184 context: &deterministic::Context,
185 provider: impl Provider<PublicKey = PublicKey>,
186 blocker: impl Blocker<PublicKey = PublicKey>,
187 signer: impl Signer<PublicKey = PublicKey>,
188 connection: (
189 Sender<PublicKey, deterministic::Context>,
190 Receiver<PublicKey>,
191 ),
192 consumer: Consumer<Key, Bytes>,
193 producer: Producer<Key, Bytes>,
194 ) -> Mailbox<Key, PublicKey> {
195 let public_key = signer.public_key();
196 let (engine, mailbox) = Engine::new(
197 context.with_label(&format!("actor_{public_key}")),
198 Config {
199 provider,
200 blocker,
201 consumer,
202 producer,
203 mailbox_size: MAILBOX_SIZE,
204 me: Some(public_key),
205 initial: INITIAL_DURATION,
206 timeout: TIMEOUT,
207 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
208 priority_requests: false,
209 priority_responses: false,
210 },
211 );
212 engine.start(connection);
213
214 mailbox
215 }
216
217 #[test_traced]
221 fn test_fetch_success() {
222 let executor = deterministic::Runner::timed(Duration::from_secs(10));
223 executor.start(|context| async move {
224 let (mut oracle, mut schemes, peers, mut connections) =
225 setup_network_and_peers(&context, &[1, 2]).await;
226
227 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
228
229 let key = Key(2);
230 let mut prod2 = Producer::default();
231 prod2.insert(key.clone(), Bytes::from("data for key 2"));
232
233 let (cons1, mut cons_out1) = Consumer::new();
234
235 let scheme = schemes.remove(0);
236 let mut mailbox1 = setup_and_spawn_actor(
237 &context,
238 oracle.manager(),
239 oracle.control(scheme.public_key()),
240 scheme,
241 connections.remove(0),
242 cons1,
243 Producer::default(),
244 )
245 .await;
246
247 let scheme = schemes.remove(0);
248 let _mailbox2 = setup_and_spawn_actor(
249 &context,
250 oracle.manager(),
251 oracle.control(scheme.public_key()),
252 scheme,
253 connections.remove(0),
254 Consumer::dummy(),
255 prod2,
256 )
257 .await;
258
259 mailbox1.fetch(key.clone()).await;
260
261 let event = cons_out1.recv().await.unwrap();
262 match event {
263 Event::Success(key_actual, value) => {
264 assert_eq!(key_actual, key);
265 assert_eq!(value, Bytes::from("data for key 2"));
266 }
267 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
268 }
269 });
270 }
271
272 #[test_traced]
276 fn test_cancel_fetch() {
277 let executor = deterministic::Runner::timed(Duration::from_secs(10));
278 executor.start(|context| async move {
279 let (oracle, mut schemes, _peers, mut connections) =
280 setup_network_and_peers(&context, &[1]).await;
281
282 let (cons1, mut cons_out1) = Consumer::new();
283 let prod1 = Producer::default();
284
285 let scheme = schemes.remove(0);
286 let mut mailbox1 = setup_and_spawn_actor(
287 &context,
288 oracle.manager(),
289 oracle.control(scheme.public_key()),
290 scheme,
291 connections.remove(0),
292 cons1,
293 prod1,
294 )
295 .await;
296
297 let key = Key(3);
298 mailbox1.fetch(key.clone()).await;
299 mailbox1.cancel(key.clone()).await;
300
301 let event = cons_out1.recv().await.unwrap();
302 match event {
303 Event::Failed(key_actual) => {
304 assert_eq!(key_actual, key);
305 }
306 Event::Success(_, _) => panic!("Fetch should have been canceled"),
307 }
308 });
309 }
310
311 #[test_traced]
316 fn test_peer_no_data() {
317 let executor = deterministic::Runner::timed(Duration::from_secs(10));
318 executor.start(|context| async move {
319 let (mut oracle, mut schemes, peers, mut connections) =
320 setup_network_and_peers(&context, &[1, 2, 3]).await;
321
322 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
323 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
324
325 let prod1 = Producer::default();
326 let prod2 = Producer::default();
327 let mut prod3 = Producer::default();
328 let key = Key(3);
329 prod3.insert(key.clone(), Bytes::from("data for key 3"));
330
331 let (cons1, mut cons_out1) = Consumer::new();
332
333 let scheme = schemes.remove(0);
334 let mut mailbox1 = setup_and_spawn_actor(
335 &context,
336 oracle.manager(),
337 oracle.control(scheme.public_key()),
338 scheme,
339 connections.remove(0),
340 cons1,
341 prod1,
342 )
343 .await;
344
345 let scheme = schemes.remove(0);
346 let _mailbox2 = setup_and_spawn_actor(
347 &context,
348 oracle.manager(),
349 oracle.control(scheme.public_key()),
350 scheme,
351 connections.remove(0),
352 Consumer::dummy(),
353 prod2,
354 )
355 .await;
356
357 let scheme = schemes.remove(0);
358 let _mailbox3 = setup_and_spawn_actor(
359 &context,
360 oracle.manager(),
361 oracle.control(scheme.public_key()),
362 scheme,
363 connections.remove(0),
364 Consumer::dummy(),
365 prod3,
366 )
367 .await;
368
369 mailbox1.fetch(key.clone()).await;
370
371 let event = cons_out1.recv().await.unwrap();
372 match event {
373 Event::Success(key_actual, value) => {
374 assert_eq!(key_actual, key);
375 assert_eq!(value, Bytes::from("data for key 3"));
376 }
377 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
378 }
379 });
380 }
381
382 #[test_traced]
387 fn test_no_peers_available() {
388 let executor = deterministic::Runner::timed(Duration::from_secs(10));
389 executor.start(|context| async move {
390 let (oracle, mut schemes, _peers, mut connections) =
391 setup_network_and_peers(&context, &[1]).await;
392
393 let (cons1, mut cons_out1) = Consumer::new();
394 let prod1 = Producer::default();
395
396 let scheme = schemes.remove(0);
397 let mut mailbox1 = setup_and_spawn_actor(
398 &context,
399 oracle.manager(),
400 oracle.control(scheme.public_key()),
401 scheme,
402 connections.remove(0),
403 cons1,
404 prod1,
405 )
406 .await;
407
408 let key = Key(4);
409 mailbox1.fetch(key.clone()).await;
410 context.sleep(Duration::from_secs(5)).await;
411 mailbox1.cancel(key.clone()).await;
412
413 let event = cons_out1.recv().await.expect("Consumer channel closed");
414 match event {
415 Event::Failed(key_actual) => {
416 assert_eq!(key_actual, key);
417 }
418 Event::Success(_, _) => {
419 panic!("Fetch should have failed due to no peers")
420 }
421 }
422 });
423 }
424
425 #[test_traced]
429 fn test_concurrent_fetch_requests() {
430 let executor = deterministic::Runner::timed(Duration::from_secs(60));
431 executor.start(|context| async move {
432 let (mut oracle, mut schemes, peers, mut connections) =
433 setup_network_and_peers(&context, &[1, 2, 3]).await;
434
435 let key2 = Key(2);
436 let key3 = Key(3);
437 let mut prod2 = Producer::default();
438 prod2.insert(key2.clone(), Bytes::from("data for key 2"));
439 let mut prod3 = Producer::default();
440 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
441
442 let (cons1, mut cons_out1) = Consumer::new();
443
444 let scheme = schemes.remove(0);
445 let mut mailbox1 = setup_and_spawn_actor(
446 &context,
447 oracle.manager(),
448 oracle.control(scheme.public_key()),
449 scheme,
450 connections.remove(0),
451 cons1,
452 Producer::default(),
453 )
454 .await;
455
456 let scheme = schemes.remove(0);
457 let _mailbox2 = setup_and_spawn_actor(
458 &context,
459 oracle.manager(),
460 oracle.control(scheme.public_key()),
461 scheme,
462 connections.remove(0),
463 Consumer::dummy(),
464 prod2,
465 )
466 .await;
467
468 let scheme = schemes.remove(0);
469 let _mailbox3 = setup_and_spawn_actor(
470 &context,
471 oracle.manager(),
472 oracle.control(scheme.public_key()),
473 scheme,
474 connections.remove(0),
475 Consumer::dummy(),
476 prod3,
477 )
478 .await;
479
480 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
482 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
483
484 for _ in 0..10 {
486 mailbox1.fetch(key2.clone()).await;
488 mailbox1.fetch(key3.clone()).await;
489
490 let mut events = Vec::new();
492 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
493 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
494
495 let mut found_key2 = false;
497 let mut found_key3 = false;
498 for event in events {
499 match event {
500 Event::Success(key_actual, value) => {
501 if key_actual == key2 {
502 assert_eq!(value, Bytes::from("data for key 2"));
503 found_key2 = true;
504 } else if key_actual == key3 {
505 assert_eq!(value, Bytes::from("data for key 3"));
506 found_key3 = true;
507 } else {
508 panic!("Unexpected key received");
509 }
510 }
511 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
512 }
513 }
514 assert!(found_key2 && found_key3,);
515 }
516 });
517 }
518
519 #[test_traced]
522 fn test_cancel() {
523 let executor = deterministic::Runner::timed(Duration::from_secs(10));
524 executor.start(|context| async move {
525 let (mut oracle, mut schemes, peers, mut connections) =
526 setup_network_and_peers(&context, &[1, 2]).await;
527
528 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
529
530 let key = Key(6);
531 let mut prod2 = Producer::default();
532 prod2.insert(key.clone(), Bytes::from("data for key 6"));
533
534 let (cons1, mut cons_out1) = Consumer::new();
535
536 let scheme = schemes.remove(0);
537 let mut mailbox1 = setup_and_spawn_actor(
538 &context,
539 oracle.manager(),
540 oracle.control(scheme.public_key()),
541 scheme,
542 connections.remove(0),
543 cons1,
544 Producer::default(),
545 )
546 .await;
547
548 let scheme = schemes.remove(0);
549 let _mailbox2 = setup_and_spawn_actor(
550 &context,
551 oracle.manager(),
552 oracle.control(scheme.public_key()),
553 scheme,
554 connections.remove(0),
555 Consumer::dummy(),
556 prod2,
557 )
558 .await;
559
560 mailbox1.cancel(key.clone()).await;
562 select! {
563 _ = cons_out1.recv() => {
564 panic!("unexpected event");
565 },
566 _ = context.sleep(Duration::from_millis(100)) => {},
567 };
568
569 mailbox1.fetch(key.clone()).await;
571 let event = cons_out1.recv().await.unwrap();
572 match event {
573 Event::Success(key_actual, value) => {
574 assert_eq!(key_actual, key);
575 assert_eq!(value, Bytes::from("data for key 6"));
576 }
577 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
578 }
579
580 mailbox1.cancel(key.clone()).await;
582 select! {
583 _ = cons_out1.recv() => {
584 panic!("unexpected event");
585 },
586 _ = context.sleep(Duration::from_millis(100)) => {},
587 };
588
589 let key = Key(7);
591 mailbox1.fetch(key.clone()).await;
592 mailbox1.cancel(key.clone()).await;
593
594 let event = cons_out1.recv().await.unwrap();
596 match event {
597 Event::Failed(key_actual) => {
598 assert_eq!(key_actual, key);
599 }
600 Event::Success(_, _) => panic!("Fetch should have been canceled"),
601 }
602 });
603 }
604
605 #[test_traced]
608 fn test_blocking_peer() {
609 let executor = deterministic::Runner::timed(Duration::from_secs(10));
610 executor.start(|context| async move {
611 let (mut oracle, mut schemes, peers, mut connections) =
612 setup_network_and_peers(&context, &[1, 2, 3]).await;
613
614 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
615 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
616 add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
617
618 let key_a = Key(1);
619 let key_b = Key(2);
620 let invalid_data_a = Bytes::from("invalid for A");
621 let valid_data_a = Bytes::from("valid for A");
622 let valid_data_b = Bytes::from("valid for B");
623
624 let mut prod2 = Producer::default();
626 prod2.insert(key_a.clone(), invalid_data_a.clone());
627 prod2.insert(key_b.clone(), valid_data_b.clone());
628
629 let mut prod3 = Producer::default();
630 prod3.insert(key_a.clone(), valid_data_a.clone());
631
632 let (mut cons1, mut cons_out1) = Consumer::new();
634 cons1.add_expected(key_a.clone(), valid_data_a.clone());
635 cons1.add_expected(key_b.clone(), valid_data_b.clone());
636
637 let scheme = schemes.remove(0);
639 let mut mailbox1 = setup_and_spawn_actor(
640 &context,
641 oracle.manager(),
642 oracle.control(scheme.public_key()),
643 scheme,
644 connections.remove(0),
645 cons1,
646 Producer::default(),
647 )
648 .await;
649
650 let scheme = schemes.remove(0);
651 let _mailbox2 = setup_and_spawn_actor(
652 &context,
653 oracle.manager(),
654 oracle.control(scheme.public_key()),
655 scheme,
656 connections.remove(0),
657 Consumer::dummy(),
658 prod2,
659 )
660 .await;
661
662 let scheme = schemes.remove(0);
663 let _mailbox3 = setup_and_spawn_actor(
664 &context,
665 oracle.manager(),
666 oracle.control(scheme.public_key()),
667 scheme,
668 connections.remove(0),
669 Consumer::dummy(),
670 prod3,
671 )
672 .await;
673
674 for _ in 0..20 {
676 mailbox1.fetch(key_a.clone()).await;
678
679 let event = cons_out1.recv().await.unwrap();
681 match event {
682 Event::Success(key_actual, value) => {
683 assert_eq!(key_actual, key_a);
684 assert_eq!(value, valid_data_a);
685 }
686 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
687 }
688 }
689
690 mailbox1.fetch(key_b.clone()).await;
692
693 context.sleep(Duration::from_secs(5)).await;
695
696 mailbox1.cancel(key_b.clone()).await;
698
699 let event = cons_out1.recv().await.unwrap();
701 match event {
702 Event::Failed(key_actual) => {
703 assert_eq!(key_actual, key_b);
704 }
705 Event::Success(_, _) => panic!("Fetch should have been canceled"),
706 }
707
708 let blocked = oracle.blocked().await.unwrap();
710 assert_eq!(blocked.len(), 1);
711 assert_eq!(blocked[0].0, peers[0]);
712 assert_eq!(blocked[0].1, peers[1]);
713 });
714 }
715
716 #[test_traced]
720 fn test_duplicate_fetch_request() {
721 let executor = deterministic::Runner::timed(Duration::from_secs(10));
722 executor.start(|context| async move {
723 let (mut oracle, mut schemes, peers, mut connections) =
724 setup_network_and_peers(&context, &[1, 2]).await;
725
726 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
727
728 let key = Key(5);
729 let mut prod2 = Producer::default();
730 prod2.insert(key.clone(), Bytes::from("data for key 5"));
731
732 let (cons1, mut cons_out1) = Consumer::new();
733
734 let scheme = schemes.remove(0);
735 let mut mailbox1 = setup_and_spawn_actor(
736 &context,
737 oracle.manager(),
738 oracle.control(scheme.public_key()),
739 scheme,
740 connections.remove(0),
741 cons1,
742 Producer::default(),
743 )
744 .await;
745
746 let scheme = schemes.remove(0);
747 let _mailbox2 = setup_and_spawn_actor(
748 &context,
749 oracle.manager(),
750 oracle.control(scheme.public_key()),
751 scheme,
752 connections.remove(0),
753 Consumer::dummy(),
754 prod2,
755 )
756 .await;
757
758 mailbox1.fetch(key.clone()).await;
760 mailbox1.fetch(key.clone()).await;
761
762 let event = cons_out1.recv().await.unwrap();
764 match event {
765 Event::Success(key_actual, value) => {
766 assert_eq!(key_actual, key);
767 assert_eq!(value, Bytes::from("data for key 5"));
768 }
769 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
770 }
771
772 select! {
774 _ = cons_out1.recv() => {
775 panic!("Unexpected second event received for duplicate fetch");
776 },
777 _ = context.sleep(Duration::from_millis(500)) => {
778 },
780 };
781 });
782 }
783
784 #[test_traced]
788 fn test_changing_peer_sets() {
789 let executor = deterministic::Runner::timed(Duration::from_secs(10));
790 executor.start(|context| async move {
791 let (mut oracle, mut schemes, peers, mut connections) =
792 setup_network_and_peers(&context, &[1, 2, 3]).await;
793
794 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
795 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
796
797 let key1 = Key(1);
798 let key2 = Key(2);
799
800 let mut prod2 = Producer::default();
801 prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
802
803 let mut prod3 = Producer::default();
804 prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
805
806 let (cons1, mut cons_out1) = Consumer::new();
807
808 let scheme = schemes.remove(0);
809 let mut mailbox1 = setup_and_spawn_actor(
810 &context,
811 oracle.manager(),
812 oracle.control(scheme.public_key()),
813 scheme,
814 connections.remove(0),
815 cons1,
816 Producer::default(),
817 )
818 .await;
819
820 let scheme = schemes.remove(0);
821 let _mailbox2 = setup_and_spawn_actor(
822 &context,
823 oracle.manager(),
824 oracle.control(scheme.public_key()),
825 scheme,
826 connections.remove(0),
827 Consumer::dummy(),
828 prod2,
829 )
830 .await;
831
832 mailbox1.fetch(key1.clone()).await;
834
835 let event = cons_out1.recv().await.unwrap();
837 match event {
838 Event::Success(key_actual, value) => {
839 assert_eq!(key_actual, key1);
840 assert_eq!(value, Bytes::from("data from peer 2"));
841 }
842 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
843 }
844
845 let scheme = schemes.remove(0);
847 let _mailbox3 = setup_and_spawn_actor(
848 &context,
849 oracle.manager(),
850 oracle.control(scheme.public_key()),
851 scheme,
852 connections.remove(0),
853 Consumer::dummy(),
854 prod3,
855 )
856 .await;
857
858 context.sleep(Duration::from_millis(200)).await;
860
861 mailbox1.fetch(key2.clone()).await;
863
864 let event = cons_out1.recv().await.unwrap();
866 match event {
867 Event::Success(key_actual, value) => {
868 assert_eq!(key_actual, key2);
869 assert_eq!(value, Bytes::from("data from peer 3"));
870 }
871 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
872 }
873 });
874 }
875
876 #[test_traced]
877 fn test_fetch_targeted() {
878 let executor = deterministic::Runner::timed(Duration::from_secs(10));
879 executor.start(|context| async move {
880 let (mut oracle, mut schemes, peers, mut connections) =
881 setup_network_and_peers(&context, &[1, 2, 3]).await;
882
883 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
884 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
885
886 let key = Key(1);
887 let invalid_data = Bytes::from("invalid data");
888 let valid_data = Bytes::from("valid data");
889
890 let mut prod2 = Producer::default();
892 prod2.insert(key.clone(), invalid_data.clone());
893
894 let mut prod3 = Producer::default();
895 prod3.insert(key.clone(), valid_data.clone());
896
897 let (mut cons1, mut cons_out1) = Consumer::new();
899 cons1.add_expected(key.clone(), valid_data.clone());
900
901 let scheme = schemes.remove(0);
902 let mut mailbox1 = setup_and_spawn_actor(
903 &context,
904 oracle.manager(),
905 oracle.control(scheme.public_key()),
906 scheme,
907 connections.remove(0),
908 cons1,
909 Producer::default(),
910 )
911 .await;
912
913 let scheme = schemes.remove(0);
914 let _mailbox2 = setup_and_spawn_actor(
915 &context,
916 oracle.manager(),
917 oracle.control(scheme.public_key()),
918 scheme,
919 connections.remove(0),
920 Consumer::dummy(),
921 prod2,
922 )
923 .await;
924
925 let scheme = schemes.remove(0);
926 let _mailbox3 = setup_and_spawn_actor(
927 &context,
928 oracle.manager(),
929 oracle.control(scheme.public_key()),
930 scheme,
931 connections.remove(0),
932 Consumer::dummy(),
933 prod3,
934 )
935 .await;
936
937 context.sleep(Duration::from_millis(100)).await;
939
940 mailbox1
944 .fetch_targeted(
945 key.clone(),
946 non_empty_vec![peers[1].clone(), peers[2].clone()],
947 )
948 .await;
949
950 let event = cons_out1.recv().await.unwrap();
952 match event {
953 Event::Success(key_actual, value) => {
954 assert_eq!(key_actual, key);
955 assert_eq!(value, valid_data);
956 }
957 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
958 }
959
960 let blocked = oracle.blocked().await.unwrap();
962 assert_eq!(blocked.len(), 1);
963 assert_eq!(blocked[0].0, peers[0]);
964 assert_eq!(blocked[0].1, peers[1]);
965
966 let metrics = context.encode();
968 assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
969 });
970 }
971
972 #[test_traced]
973 fn test_fetch_targeted_no_fallback() {
974 let executor = deterministic::Runner::timed(Duration::from_secs(10));
975 executor.start(|context| async move {
976 let (mut oracle, mut schemes, peers, mut connections) =
977 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
978
979 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
980 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
981 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
982
983 let key = Key(1);
984
985 let mut prod4 = Producer::default();
987 prod4.insert(key.clone(), Bytes::from("data from peer 4"));
988
989 let (cons1, mut cons_out1) = Consumer::new();
990
991 let scheme = schemes.remove(0);
992 let mut mailbox1 = setup_and_spawn_actor(
993 &context,
994 oracle.manager(),
995 oracle.control(scheme.public_key()),
996 scheme,
997 connections.remove(0),
998 cons1,
999 Producer::default(),
1000 )
1001 .await;
1002
1003 let scheme = schemes.remove(0);
1004 let _mailbox2 = setup_and_spawn_actor(
1005 &context,
1006 oracle.manager(),
1007 oracle.control(scheme.public_key()),
1008 scheme,
1009 connections.remove(0),
1010 Consumer::dummy(),
1011 Producer::default(), )
1013 .await;
1014
1015 let scheme = schemes.remove(0);
1016 let _mailbox3 = setup_and_spawn_actor(
1017 &context,
1018 oracle.manager(),
1019 oracle.control(scheme.public_key()),
1020 scheme,
1021 connections.remove(0),
1022 Consumer::dummy(),
1023 Producer::default(), )
1025 .await;
1026
1027 let scheme = schemes.remove(0);
1028 let _mailbox4 = setup_and_spawn_actor(
1029 &context,
1030 oracle.manager(),
1031 oracle.control(scheme.public_key()),
1032 scheme,
1033 connections.remove(0),
1034 Consumer::dummy(),
1035 prod4,
1036 )
1037 .await;
1038
1039 context.sleep(Duration::from_millis(100)).await;
1041
1042 mailbox1
1045 .fetch_targeted(
1046 key.clone(),
1047 non_empty_vec![peers[1].clone(), peers[2].clone()],
1048 )
1049 .await;
1050
1051 select! {
1054 event = cons_out1.recv() => {
1055 panic!("Fetch should not succeed, but got: {event:?}");
1056 },
1057 _ = context.sleep(Duration::from_secs(3)) => {
1058 },
1060 };
1061 });
1062 }
1063
1064 #[test_traced]
1065 fn test_fetch_all_targeted() {
1066 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1067 executor.start(|context| async move {
1068 let (mut oracle, mut schemes, peers, mut connections) =
1069 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1070
1071 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1072 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1073 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1074
1075 let key1 = Key(1);
1076 let key2 = Key(2);
1077 let key3 = Key(3);
1078
1079 let mut prod2 = Producer::default();
1081 prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1082
1083 let mut prod3 = Producer::default();
1085 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1086
1087 let mut prod4 = Producer::default();
1089 prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1090
1091 let (mut cons1, mut cons_out1) = Consumer::new();
1093 cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1094 cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1095 cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1096
1097 let scheme = schemes.remove(0);
1098 let mut mailbox1 = setup_and_spawn_actor(
1099 &context,
1100 oracle.manager(),
1101 oracle.control(scheme.public_key()),
1102 scheme,
1103 connections.remove(0),
1104 cons1,
1105 Producer::default(),
1106 )
1107 .await;
1108
1109 let scheme = schemes.remove(0);
1110 let _mailbox2 = setup_and_spawn_actor(
1111 &context,
1112 oracle.manager(),
1113 oracle.control(scheme.public_key()),
1114 scheme,
1115 connections.remove(0),
1116 Consumer::dummy(),
1117 prod2,
1118 )
1119 .await;
1120
1121 let scheme = schemes.remove(0);
1122 let _mailbox3 = setup_and_spawn_actor(
1123 &context,
1124 oracle.manager(),
1125 oracle.control(scheme.public_key()),
1126 scheme,
1127 connections.remove(0),
1128 Consumer::dummy(),
1129 prod3,
1130 )
1131 .await;
1132
1133 let scheme = schemes.remove(0);
1134 let _mailbox4 = setup_and_spawn_actor(
1135 &context,
1136 oracle.manager(),
1137 oracle.control(scheme.public_key()),
1138 scheme,
1139 connections.remove(0),
1140 Consumer::dummy(),
1141 prod4,
1142 )
1143 .await;
1144
1145 context.sleep(Duration::from_millis(100)).await;
1147
1148 mailbox1
1153 .fetch_all_targeted(vec![
1154 (key1.clone(), non_empty_vec![peers[1].clone()]), (key2.clone(), non_empty_vec![peers[3].clone()]), ])
1157 .await;
1158 mailbox1.fetch(key3.clone()).await; let mut results = HashMap::new();
1162 for _ in 0..3 {
1163 let event = cons_out1.recv().await.unwrap();
1164 match event {
1165 Event::Success(key, value) => {
1166 results.insert(key, value);
1167 }
1168 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1169 }
1170 }
1171
1172 assert_eq!(results.len(), 3);
1174 assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1175 assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1176 assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1177
1178 let metrics = context.encode();
1180 assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1181 });
1182 }
1183
1184 #[test_traced]
1187 fn test_fetch_clears_targets() {
1188 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1189 executor.start(|context| async move {
1190 let (mut oracle, mut schemes, peers, mut connections) =
1191 setup_network_and_peers(&context, &[1, 2, 3]).await;
1192
1193 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1194 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1195
1196 let key = Key(1);
1197 let valid_data = Bytes::from("valid data");
1198
1199 let mut prod3 = Producer::default();
1201 prod3.insert(key.clone(), valid_data.clone());
1202
1203 let (cons1, mut cons_out1) = Consumer::new();
1204
1205 let scheme = schemes.remove(0);
1206 let mut mailbox1 = setup_and_spawn_actor(
1207 &context,
1208 oracle.manager(),
1209 oracle.control(scheme.public_key()),
1210 scheme,
1211 connections.remove(0),
1212 cons1,
1213 Producer::default(),
1214 )
1215 .await;
1216
1217 let scheme = schemes.remove(0);
1218 let _mailbox2 = setup_and_spawn_actor(
1219 &context,
1220 oracle.manager(),
1221 oracle.control(scheme.public_key()),
1222 scheme,
1223 connections.remove(0),
1224 Consumer::dummy(),
1225 Producer::default(), )
1227 .await;
1228
1229 let scheme = schemes.remove(0);
1230 let _mailbox3 = setup_and_spawn_actor(
1231 &context,
1232 oracle.manager(),
1233 oracle.control(scheme.public_key()),
1234 scheme,
1235 connections.remove(0),
1236 Consumer::dummy(),
1237 prod3,
1238 )
1239 .await;
1240
1241 context.sleep(Duration::from_millis(100)).await;
1243
1244 mailbox1
1246 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1247 .await;
1248
1249 context.sleep(Duration::from_millis(500)).await;
1251
1252 mailbox1.fetch(key.clone()).await;
1254
1255 let event = cons_out1.recv().await.unwrap();
1257 match event {
1258 Event::Success(key_actual, value) => {
1259 assert_eq!(key_actual, key);
1260 assert_eq!(value, valid_data);
1261 }
1262 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1263 }
1264 });
1265 }
1266
1267 #[test_traced]
1268 fn test_fetch_targeted_does_not_restrict_all() {
1269 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1270 executor.start(|context| async move {
1271 let (mut oracle, mut schemes, peers, mut connections) =
1272 setup_network_and_peers(&context, &[1, 2, 3]).await;
1273
1274 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1275 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1276
1277 let key = Key(1);
1278 let valid_data = Bytes::from("valid data");
1279
1280 let mut prod3 = Producer::default();
1282 prod3.insert(key.clone(), valid_data.clone());
1283
1284 let (cons1, mut cons_out1) = Consumer::new();
1285
1286 let scheme = schemes.remove(0);
1287 let mut mailbox1 = setup_and_spawn_actor(
1288 &context,
1289 oracle.manager(),
1290 oracle.control(scheme.public_key()),
1291 scheme,
1292 connections.remove(0),
1293 cons1,
1294 Producer::default(),
1295 )
1296 .await;
1297
1298 let scheme = schemes.remove(0);
1299 let _mailbox2 = setup_and_spawn_actor(
1300 &context,
1301 oracle.manager(),
1302 oracle.control(scheme.public_key()),
1303 scheme,
1304 connections.remove(0),
1305 Consumer::dummy(),
1306 Producer::default(), )
1308 .await;
1309
1310 let scheme = schemes.remove(0);
1311 let _mailbox3 = setup_and_spawn_actor(
1312 &context,
1313 oracle.manager(),
1314 oracle.control(scheme.public_key()),
1315 scheme,
1316 connections.remove(0),
1317 Consumer::dummy(),
1318 prod3,
1319 )
1320 .await;
1321
1322 context.sleep(Duration::from_millis(100)).await;
1324
1325 mailbox1.fetch(key.clone()).await;
1327
1328 context.sleep(Duration::from_millis(50)).await;
1330
1331 mailbox1
1334 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1335 .await;
1336
1337 let event = cons_out1.recv().await.unwrap();
1340 match event {
1341 Event::Success(key_actual, value) => {
1342 assert_eq!(key_actual, key);
1343 assert_eq!(value, valid_data);
1344 }
1345 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1346 }
1347 });
1348 }
1349
1350 #[test_traced]
1351 fn test_retain() {
1352 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1353 executor.start(|context| async move {
1354 let (mut oracle, mut schemes, peers, mut connections) =
1355 setup_network_and_peers(&context, &[1, 2]).await;
1356
1357 let key = Key(5);
1358 let mut prod2 = Producer::default();
1359 prod2.insert(key.clone(), Bytes::from("data for key 5"));
1360
1361 let (cons1, mut cons_out1) = Consumer::new();
1362
1363 let scheme = schemes.remove(0);
1364 let mut mailbox1 = setup_and_spawn_actor(
1365 &context,
1366 oracle.manager(),
1367 oracle.control(scheme.public_key()),
1368 scheme,
1369 connections.remove(0),
1370 cons1,
1371 Producer::default(),
1372 )
1373 .await;
1374
1375 let scheme = schemes.remove(0);
1376 let _mailbox2 = setup_and_spawn_actor(
1377 &context,
1378 oracle.manager(),
1379 oracle.control(scheme.public_key()),
1380 scheme,
1381 connections.remove(0),
1382 Consumer::dummy(),
1383 prod2,
1384 )
1385 .await;
1386
1387 mailbox1.retain(|_| true).await;
1389 select! {
1390 _ = cons_out1.recv() => {
1391 panic!("unexpected event");
1392 },
1393 _ = context.sleep(Duration::from_millis(100)) => {},
1394 };
1395
1396 mailbox1.fetch(key.clone()).await;
1398
1399 let key_clone = key.clone();
1402 mailbox1.retain(move |k| k != &key_clone).await;
1403
1404 let event = cons_out1.recv().await.unwrap();
1406 match event {
1407 Event::Failed(key_actual) => {
1408 assert_eq!(key_actual, key);
1409 }
1410 Event::Success(_, _) => panic!("Fetch should have been retained out"),
1411 }
1412
1413 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1415
1416 mailbox1.fetch(key.clone()).await;
1419
1420 let event = cons_out1.recv().await.unwrap();
1422 match event {
1423 Event::Success(key_actual, value) => {
1424 assert_eq!(key_actual, key);
1425 assert_eq!(value, Bytes::from("data for key 5"));
1426 }
1427 Event::Failed(_) => unreachable!(),
1428 }
1429 });
1430 }
1431
1432 #[test_traced]
1433 fn test_clear() {
1434 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1435 executor.start(|context| async move {
1436 let (mut oracle, mut schemes, peers, mut connections) =
1437 setup_network_and_peers(&context, &[1, 2]).await;
1438
1439 let key = Key(6);
1441 let mut prod2 = Producer::default();
1442 prod2.insert(key.clone(), Bytes::from("data for key 6"));
1443
1444 let (cons1, mut cons_out1) = Consumer::new();
1445
1446 let scheme = schemes.remove(0);
1447 let mut mailbox1 = setup_and_spawn_actor(
1448 &context,
1449 oracle.manager(),
1450 oracle.control(scheme.public_key()),
1451 scheme,
1452 connections.remove(0),
1453 cons1,
1454 Producer::default(),
1455 )
1456 .await;
1457
1458 let scheme = schemes.remove(0);
1459 let _mailbox2 = setup_and_spawn_actor(
1460 &context,
1461 oracle.manager(),
1462 oracle.control(scheme.public_key()),
1463 scheme,
1464 connections.remove(0),
1465 Consumer::dummy(),
1466 prod2,
1467 )
1468 .await;
1469
1470 mailbox1.clear().await;
1472 select! {
1473 _ = cons_out1.recv() => {
1474 panic!("unexpected event");
1475 },
1476 _ = context.sleep(Duration::from_millis(100)) => {},
1477 };
1478
1479 mailbox1.fetch(key.clone()).await;
1481
1482 mailbox1.clear().await;
1484
1485 let event = cons_out1.recv().await.unwrap();
1487 match event {
1488 Event::Failed(key_actual) => {
1489 assert_eq!(key_actual, key);
1490 }
1491 Event::Success(_, _) => panic!("Fetch should have been cleared"),
1492 }
1493
1494 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1496
1497 mailbox1.fetch(key.clone()).await;
1500
1501 let event = cons_out1.recv().await.unwrap();
1503 match event {
1504 Event::Success(key_actual, value) => {
1505 assert_eq!(key_actual, key);
1506 assert_eq!(value, Bytes::from("data for key 6"));
1507 }
1508 Event::Failed(_) => unreachable!(),
1509 }
1510 });
1511 }
1512
1513 #[test_traced]
1517 fn test_rate_limit_spillover() {
1518 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1519 executor.start(|context| async move {
1520 let (mut oracle, mut schemes, peers, mut connections) =
1522 setup_network_and_peers_with_rate_limit(
1523 &context,
1524 &[1, 2, 3],
1525 Quota::per_second(NZU32!(1)),
1526 )
1527 .await;
1528
1529 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1531 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1532
1533 let mut prod2 = Producer::default();
1535 let mut prod3 = Producer::default();
1536 prod2.insert(Key(0), Bytes::from("data for key 0"));
1537 prod2.insert(Key(1), Bytes::from("data for key 1"));
1538 prod3.insert(Key(0), Bytes::from("data for key 0"));
1539 prod3.insert(Key(1), Bytes::from("data for key 1"));
1540
1541 let (cons1, mut cons_out1) = Consumer::new();
1542
1543 let scheme = schemes.remove(0);
1545 let mut mailbox1 = setup_and_spawn_actor(
1546 &context,
1547 oracle.manager(),
1548 oracle.control(scheme.public_key()),
1549 scheme,
1550 connections.remove(0),
1551 cons1,
1552 Producer::default(),
1553 )
1554 .await;
1555
1556 let scheme = schemes.remove(0);
1558 let _mailbox2 = setup_and_spawn_actor(
1559 &context,
1560 oracle.manager(),
1561 oracle.control(scheme.public_key()),
1562 scheme,
1563 connections.remove(0),
1564 Consumer::dummy(),
1565 prod2,
1566 )
1567 .await;
1568
1569 let scheme = schemes.remove(0);
1571 let _mailbox3 = setup_and_spawn_actor(
1572 &context,
1573 oracle.manager(),
1574 oracle.control(scheme.public_key()),
1575 scheme,
1576 connections.remove(0),
1577 Consumer::dummy(),
1578 prod3,
1579 )
1580 .await;
1581
1582 context.sleep(Duration::from_millis(100)).await;
1584 let start = context.current();
1585
1586 mailbox1.fetch(Key(0)).await;
1590 mailbox1.fetch(Key(1)).await;
1591
1592 let mut results = HashMap::new();
1594 for _ in 0..2 {
1595 let event = cons_out1.recv().await.unwrap();
1596 match event {
1597 Event::Success(key, value) => {
1598 results.insert(key.clone(), value);
1599 }
1600 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1601 }
1602 }
1603
1604 assert_eq!(results.len(), 2);
1606 assert_eq!(
1607 results.get(&Key(0)).unwrap(),
1608 &Bytes::from("data for key 0")
1609 );
1610 assert_eq!(
1611 results.get(&Key(1)).unwrap(),
1612 &Bytes::from("data for key 1")
1613 );
1614
1615 let elapsed = context.current().duration_since(start).unwrap();
1618 assert!(
1619 elapsed < Duration::from_millis(500),
1620 "Expected quick completion via spill-over, but took {elapsed:?}"
1621 );
1622 });
1623 }
1624
1625 #[test_traced]
1629 fn test_rate_limit_retry_after_reset() {
1630 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1631 executor.start(|context| async move {
1632 let (mut oracle, mut schemes, peers, mut connections) =
1634 setup_network_and_peers_with_rate_limit(
1635 &context,
1636 &[1, 2],
1637 Quota::per_second(NZU32!(1)),
1638 )
1639 .await;
1640
1641 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1642
1643 let mut prod2 = Producer::default();
1645 prod2.insert(Key(1), Bytes::from("data for key 1"));
1646 prod2.insert(Key(2), Bytes::from("data for key 2"));
1647 prod2.insert(Key(3), Bytes::from("data for key 3"));
1648
1649 let (cons1, mut cons_out1) = Consumer::new();
1650
1651 let scheme = schemes.remove(0);
1652 let mut mailbox1 = setup_and_spawn_actor(
1653 &context,
1654 oracle.manager(),
1655 oracle.control(scheme.public_key()),
1656 scheme,
1657 connections.remove(0),
1658 cons1,
1659 Producer::default(),
1660 )
1661 .await;
1662
1663 let scheme = schemes.remove(0);
1664 let _mailbox2 = setup_and_spawn_actor(
1665 &context,
1666 oracle.manager(),
1667 oracle.control(scheme.public_key()),
1668 scheme,
1669 connections.remove(0),
1670 Consumer::dummy(),
1671 prod2,
1672 )
1673 .await;
1674
1675 context.sleep(Duration::from_millis(100)).await;
1677 let start = context.current();
1678
1679 mailbox1.fetch(Key(1)).await;
1682 mailbox1.fetch(Key(2)).await;
1683 mailbox1.fetch(Key(3)).await;
1684
1685 let mut results = HashMap::new();
1687 for _ in 0..3 {
1688 let event = cons_out1.recv().await.unwrap();
1689 match event {
1690 Event::Success(key, value) => {
1691 results.insert(key.clone(), value);
1692 }
1693 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1694 }
1695 }
1696
1697 assert_eq!(results.len(), 3);
1698 for i in 1..=3 {
1699 assert_eq!(
1700 results.get(&Key(i)).unwrap(),
1701 &Bytes::from(format!("data for key {}", i))
1702 );
1703 }
1704
1705 let elapsed = context.current().duration_since(start).unwrap();
1709 assert!(
1710 elapsed > Duration::from_secs(2),
1711 "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1712 );
1713 });
1714 }
1715
1716 #[test_traced]
1720 fn test_self_exclusion() {
1721 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1722 executor.start(|context| async move {
1723 let (mut oracle, mut schemes, peers, mut connections) =
1724 setup_network_and_peers(&context, &[1, 2]).await;
1725
1726 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1727
1728 let key = Key(1);
1729 let data = Bytes::from("shared data");
1730
1731 let mut prod1 = Producer::default();
1733 prod1.insert(key.clone(), data.clone());
1734 let mut prod2 = Producer::default();
1735 prod2.insert(key.clone(), data.clone());
1736
1737 let (cons1, mut cons_out1) = Consumer::new();
1738
1739 let scheme = schemes.remove(0);
1741 let mut mailbox1 = setup_and_spawn_actor(
1742 &context,
1743 oracle.manager(),
1744 oracle.control(scheme.public_key()),
1745 scheme,
1746 connections.remove(0),
1747 cons1,
1748 prod1, )
1750 .await;
1751
1752 let scheme = schemes.remove(0);
1754 let _mailbox2 = setup_and_spawn_actor(
1755 &context,
1756 oracle.manager(),
1757 oracle.control(scheme.public_key()),
1758 scheme,
1759 connections.remove(0),
1760 Consumer::dummy(),
1761 prod2,
1762 )
1763 .await;
1764
1765 context.sleep(Duration::from_millis(100)).await;
1767
1768 mailbox1.fetch(key.clone()).await;
1770
1771 let event = cons_out1.recv().await.unwrap();
1773 match event {
1774 Event::Success(key_actual, value) => {
1775 assert_eq!(key_actual, key);
1776 assert_eq!(value, data);
1777 }
1778 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1779 }
1780 });
1781 }
1782
1783 #[allow(clippy::type_complexity)]
1784 fn spawn_actors_with_handles(
1785 context: deterministic::Context,
1786 oracle: &Oracle<PublicKey, deterministic::Context>,
1787 schemes: Vec<PrivateKey>,
1788 connections: Vec<(
1789 Sender<PublicKey, deterministic::Context>,
1790 Receiver<PublicKey>,
1791 )>,
1792 consumers: Vec<Consumer<Key, Bytes>>,
1793 producers: Vec<Producer<Key, Bytes>>,
1794 ) -> (
1795 Vec<Mailbox<Key, PublicKey>>,
1796 Vec<commonware_runtime::Handle<()>>,
1797 ) {
1798 let actor_context = context.with_label("actor");
1799 let mut mailboxes = Vec::new();
1800 let mut handles = Vec::new();
1801
1802 for (idx, ((scheme, conn), (consumer, producer))) in schemes
1803 .into_iter()
1804 .zip(connections)
1805 .zip(consumers.into_iter().zip(producers))
1806 .enumerate()
1807 {
1808 let ctx = actor_context.with_label(&format!("peer_{idx}"));
1809 let public_key = scheme.public_key();
1810 let (engine, mailbox) = Engine::new(
1811 ctx,
1812 Config {
1813 provider: oracle.manager(),
1814 blocker: oracle.control(public_key.clone()),
1815 consumer,
1816 producer,
1817 mailbox_size: MAILBOX_SIZE,
1818 me: Some(public_key),
1819 initial: INITIAL_DURATION,
1820 timeout: TIMEOUT,
1821 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
1822 priority_requests: false,
1823 priority_responses: false,
1824 },
1825 );
1826 handles.push(engine.start(conn));
1827 mailboxes.push(mailbox);
1828 }
1829
1830 (mailboxes, handles)
1831 }
1832
1833 #[test_traced]
1834 fn test_operations_after_shutdown_do_not_panic() {
1835 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1836 executor.start(|context| async move {
1837 let (mut oracle, schemes, peers, connections) =
1838 setup_network_and_peers(&context, &[1, 2]).await;
1839
1840 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1841
1842 let key = Key(1);
1843 let mut prod2 = Producer::default();
1844 prod2.insert(key.clone(), Bytes::from("data for key 1"));
1845
1846 let (cons1, mut cons_out1) = Consumer::new();
1847
1848 let (mut mailboxes, handles) = spawn_actors_with_handles(
1849 context.clone(),
1850 &oracle,
1851 schemes,
1852 connections,
1853 vec![cons1, Consumer::dummy()],
1854 vec![Producer::default(), prod2],
1855 );
1856
1857 mailboxes[0].fetch(key.clone()).await;
1859 let event = cons_out1.recv().await.unwrap();
1860 match event {
1861 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1862 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1863 }
1864
1865 for handle in handles {
1867 handle.abort();
1868 }
1869 context.sleep(Duration::from_millis(100)).await;
1870
1871 let key2 = Key(2);
1875 mailboxes[0].fetch(key2.clone()).await;
1876
1877 mailboxes[0].cancel(key2.clone()).await;
1879
1880 mailboxes[0].clear().await;
1882
1883 mailboxes[0].retain(|_| true).await;
1885
1886 mailboxes[0]
1888 .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
1889 .await;
1890 });
1891 }
1892
1893 fn clean_shutdown(seed: u64) {
1894 let cfg = deterministic::Config::default()
1895 .with_seed(seed)
1896 .with_timeout(Some(Duration::from_secs(30)));
1897 let executor = deterministic::Runner::new(cfg);
1898 executor.start(|context| async move {
1899 let (mut oracle, schemes, peers, connections) =
1900 setup_network_and_peers(&context, &[1, 2]).await;
1901
1902 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1903
1904 let key = Key(1);
1905 let mut prod2 = Producer::default();
1906 prod2.insert(key.clone(), Bytes::from("data for key 1"));
1907
1908 let (cons1, mut cons_out1) = Consumer::new();
1909
1910 let (mut mailboxes, handles) = spawn_actors_with_handles(
1911 context.clone(),
1912 &oracle,
1913 schemes,
1914 connections,
1915 vec![cons1, Consumer::dummy()],
1916 vec![Producer::default(), prod2],
1917 );
1918
1919 context.sleep(Duration::from_millis(100)).await;
1921
1922 let running_before = count_running_tasks(&context, "actor");
1924 assert!(
1925 running_before > 0,
1926 "at least one actor task should be running"
1927 );
1928
1929 mailboxes[0].fetch(key.clone()).await;
1931 let event = cons_out1.recv().await.unwrap();
1932 match event {
1933 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1934 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1935 }
1936
1937 for handle in handles {
1939 handle.abort();
1940 }
1941 context.sleep(Duration::from_millis(100)).await;
1942
1943 let running_after = count_running_tasks(&context, "actor");
1945 assert_eq!(
1946 running_after, 0,
1947 "all actor tasks should be stopped, but {running_after} still running"
1948 );
1949 });
1950 }
1951
1952 #[test]
1953 fn test_clean_shutdown() {
1954 for seed in 0..25 {
1955 clean_shutdown(seed);
1956 }
1957 }
1958}