1use bytes::Bytes;
44use commonware_utils::{channel::oneshot, Span};
45use std::future::Future;
46
47mod config;
48pub use config::Config;
49mod engine;
50pub use engine::Engine;
51mod fetcher;
52mod ingress;
53pub use ingress::Mailbox;
54mod metrics;
55mod wire;
56
57#[cfg(feature = "mocks")]
58pub mod mocks;
59
60pub trait Producer: Clone + Send + 'static {
62 type Key: Span;
64
65 fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
67}
68
69#[cfg(test)]
70mod tests {
71 use super::{
72 mocks::{Consumer, Event, Key, Producer},
73 Config, Engine, Mailbox,
74 };
75 use crate::Resolver;
76 use bytes::Bytes;
77 use commonware_cryptography::{
78 ed25519::{PrivateKey, PublicKey},
79 Signer,
80 };
81 use commonware_macros::{select, test_traced};
82 use commonware_p2p::{
83 simulated::{Link, Network, Oracle, Receiver, Sender},
84 Blocker, Manager, Provider,
85 };
86 use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
87 use commonware_utils::{non_empty_vec, NZU32};
88 use std::{collections::HashMap, num::NonZeroU32, time::Duration};
89
90 const MAILBOX_SIZE: usize = 1024;
91 const RATE_LIMIT: NonZeroU32 = NZU32!(10);
92 const INITIAL_DURATION: Duration = Duration::from_millis(100);
93 const TIMEOUT: Duration = Duration::from_millis(400);
94 const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
95 const LINK: Link = Link {
96 latency: Duration::from_millis(10),
97 jitter: Duration::from_millis(1),
98 success_rate: 1.0,
99 };
100 const LINK_UNRELIABLE: Link = Link {
101 latency: Duration::from_millis(10),
102 jitter: Duration::from_millis(1),
103 success_rate: 0.5,
104 };
105
106 async fn setup_network_and_peers(
107 context: &deterministic::Context,
108 peer_seeds: &[u64],
109 ) -> (
110 Oracle<PublicKey, deterministic::Context>,
111 Vec<PrivateKey>,
112 Vec<PublicKey>,
113 Vec<(
114 Sender<PublicKey, deterministic::Context>,
115 Receiver<PublicKey>,
116 )>,
117 ) {
118 setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
119 .await
120 }
121
122 async fn setup_network_and_peers_with_rate_limit(
123 context: &deterministic::Context,
124 peer_seeds: &[u64],
125 rate_limit: Quota,
126 ) -> (
127 Oracle<PublicKey, deterministic::Context>,
128 Vec<PrivateKey>,
129 Vec<PublicKey>,
130 Vec<(
131 Sender<PublicKey, deterministic::Context>,
132 Receiver<PublicKey>,
133 )>,
134 ) {
135 let (network, oracle) = Network::new(
136 context.with_label("network"),
137 commonware_p2p::simulated::Config {
138 max_size: 1024 * 1024,
139 disconnect_on_block: true,
140 tracked_peer_sets: Some(3),
141 },
142 );
143 network.start();
144
145 let schemes: Vec<PrivateKey> = peer_seeds
146 .iter()
147 .map(|seed| PrivateKey::from_seed(*seed))
148 .collect();
149 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
150 let mut manager = oracle.manager();
151 manager.track(0, peers.clone().try_into().unwrap()).await;
152
153 let mut connections = Vec::new();
154 for peer in &peers {
155 let (sender, receiver) = oracle
156 .control(peer.clone())
157 .register(0, rate_limit)
158 .await
159 .unwrap();
160 connections.push((sender, receiver));
161 }
162
163 (oracle, schemes, peers, connections)
164 }
165
166 async fn add_link(
167 oracle: &mut Oracle<PublicKey, deterministic::Context>,
168 link: Link,
169 peers: &[PublicKey],
170 from: usize,
171 to: usize,
172 ) {
173 oracle
174 .add_link(peers[from].clone(), peers[to].clone(), link.clone())
175 .await
176 .unwrap();
177 oracle
178 .add_link(peers[to].clone(), peers[from].clone(), link)
179 .await
180 .unwrap();
181 }
182
183 fn setup_and_spawn_actor(
184 context: &deterministic::Context,
185 provider: impl Provider<PublicKey = PublicKey>,
186 blocker: impl Blocker<PublicKey = PublicKey>,
187 signer: impl Signer<PublicKey = PublicKey>,
188 connection: (
189 Sender<PublicKey, deterministic::Context>,
190 Receiver<PublicKey>,
191 ),
192 consumer: Consumer<Key, Bytes>,
193 producer: Producer<Key, Bytes>,
194 ) -> Mailbox<Key, PublicKey> {
195 let public_key = signer.public_key();
196 let (engine, mailbox) = Engine::new(
197 context.with_label(&format!("actor_{public_key}")),
198 Config {
199 peer_provider: provider,
200 blocker,
201 consumer,
202 producer,
203 mailbox_size: MAILBOX_SIZE,
204 me: Some(public_key),
205 initial: INITIAL_DURATION,
206 timeout: TIMEOUT,
207 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
208 priority_requests: false,
209 priority_responses: false,
210 },
211 );
212 engine.start(connection);
213
214 mailbox
215 }
216
217 #[test_traced]
221 fn test_fetch_success() {
222 let executor = deterministic::Runner::timed(Duration::from_secs(10));
223 executor.start(|context| async move {
224 let (mut oracle, mut schemes, peers, mut connections) =
225 setup_network_and_peers(&context, &[1, 2]).await;
226
227 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
228
229 let key = Key(2);
230 let mut prod2 = Producer::default();
231 prod2.insert(key.clone(), Bytes::from("data for key 2"));
232
233 let (cons1, mut cons_out1) = Consumer::new();
234
235 let scheme = schemes.remove(0);
236 let mut mailbox1 = setup_and_spawn_actor(
237 &context,
238 oracle.manager(),
239 oracle.control(scheme.public_key()),
240 scheme,
241 connections.remove(0),
242 cons1,
243 Producer::default(),
244 );
245
246 let scheme = schemes.remove(0);
247 let _mailbox2 = setup_and_spawn_actor(
248 &context,
249 oracle.manager(),
250 oracle.control(scheme.public_key()),
251 scheme,
252 connections.remove(0),
253 Consumer::dummy(),
254 prod2,
255 );
256
257 mailbox1.fetch(key.clone()).await;
258
259 let event = cons_out1.recv().await.unwrap();
260 match event {
261 Event::Success(key_actual, value) => {
262 assert_eq!(key_actual, key);
263 assert_eq!(value, Bytes::from("data for key 2"));
264 }
265 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
266 }
267 });
268 }
269
270 #[test_traced]
274 fn test_cancel_fetch() {
275 let executor = deterministic::Runner::timed(Duration::from_secs(10));
276 executor.start(|context| async move {
277 let (oracle, mut schemes, _peers, mut connections) =
278 setup_network_and_peers(&context, &[1]).await;
279
280 let (cons1, mut cons_out1) = Consumer::new();
281 let prod1 = Producer::default();
282
283 let scheme = schemes.remove(0);
284 let mut mailbox1 = setup_and_spawn_actor(
285 &context,
286 oracle.manager(),
287 oracle.control(scheme.public_key()),
288 scheme,
289 connections.remove(0),
290 cons1,
291 prod1,
292 );
293
294 let key = Key(3);
295 mailbox1.fetch(key.clone()).await;
296 mailbox1.cancel(key.clone()).await;
297
298 let event = cons_out1.recv().await.unwrap();
299 match event {
300 Event::Failed(key_actual) => {
301 assert_eq!(key_actual, key);
302 }
303 Event::Success(_, _) => panic!("Fetch should have been canceled"),
304 }
305 });
306 }
307
308 #[test_traced]
313 fn test_peer_no_data() {
314 let executor = deterministic::Runner::timed(Duration::from_secs(10));
315 executor.start(|context| async move {
316 let (mut oracle, mut schemes, peers, mut connections) =
317 setup_network_and_peers(&context, &[1, 2, 3]).await;
318
319 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
320 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
321
322 let prod1 = Producer::default();
323 let prod2 = Producer::default();
324 let mut prod3 = Producer::default();
325 let key = Key(3);
326 prod3.insert(key.clone(), Bytes::from("data for key 3"));
327
328 let (cons1, mut cons_out1) = Consumer::new();
329
330 let scheme = schemes.remove(0);
331 let mut mailbox1 = setup_and_spawn_actor(
332 &context,
333 oracle.manager(),
334 oracle.control(scheme.public_key()),
335 scheme,
336 connections.remove(0),
337 cons1,
338 prod1,
339 );
340
341 let scheme = schemes.remove(0);
342 let _mailbox2 = setup_and_spawn_actor(
343 &context,
344 oracle.manager(),
345 oracle.control(scheme.public_key()),
346 scheme,
347 connections.remove(0),
348 Consumer::dummy(),
349 prod2,
350 );
351
352 let scheme = schemes.remove(0);
353 let _mailbox3 = setup_and_spawn_actor(
354 &context,
355 oracle.manager(),
356 oracle.control(scheme.public_key()),
357 scheme,
358 connections.remove(0),
359 Consumer::dummy(),
360 prod3,
361 );
362
363 mailbox1.fetch(key.clone()).await;
364
365 let event = cons_out1.recv().await.unwrap();
366 match event {
367 Event::Success(key_actual, value) => {
368 assert_eq!(key_actual, key);
369 assert_eq!(value, Bytes::from("data for key 3"));
370 }
371 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
372 }
373 });
374 }
375
376 #[test_traced]
381 fn test_no_peers_available() {
382 let executor = deterministic::Runner::timed(Duration::from_secs(10));
383 executor.start(|context| async move {
384 let (oracle, mut schemes, _peers, mut connections) =
385 setup_network_and_peers(&context, &[1]).await;
386
387 let (cons1, mut cons_out1) = Consumer::new();
388 let prod1 = Producer::default();
389
390 let scheme = schemes.remove(0);
391 let mut mailbox1 = setup_and_spawn_actor(
392 &context,
393 oracle.manager(),
394 oracle.control(scheme.public_key()),
395 scheme,
396 connections.remove(0),
397 cons1,
398 prod1,
399 );
400
401 let key = Key(4);
402 mailbox1.fetch(key.clone()).await;
403 context.sleep(Duration::from_secs(5)).await;
404 mailbox1.cancel(key.clone()).await;
405
406 let event = cons_out1.recv().await.expect("Consumer channel closed");
407 match event {
408 Event::Failed(key_actual) => {
409 assert_eq!(key_actual, key);
410 }
411 Event::Success(_, _) => {
412 panic!("Fetch should have failed due to no peers")
413 }
414 }
415 });
416 }
417
418 #[test_traced]
422 fn test_concurrent_fetch_requests() {
423 let executor = deterministic::Runner::timed(Duration::from_secs(60));
424 executor.start(|context| async move {
425 let (mut oracle, mut schemes, peers, mut connections) =
426 setup_network_and_peers(&context, &[1, 2, 3]).await;
427
428 let key2 = Key(2);
429 let key3 = Key(3);
430 let mut prod2 = Producer::default();
431 prod2.insert(key2.clone(), Bytes::from("data for key 2"));
432 let mut prod3 = Producer::default();
433 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
434
435 let (cons1, mut cons_out1) = Consumer::new();
436
437 let scheme = schemes.remove(0);
438 let mut mailbox1 = setup_and_spawn_actor(
439 &context,
440 oracle.manager(),
441 oracle.control(scheme.public_key()),
442 scheme,
443 connections.remove(0),
444 cons1,
445 Producer::default(),
446 );
447
448 let scheme = schemes.remove(0);
449 let _mailbox2 = setup_and_spawn_actor(
450 &context,
451 oracle.manager(),
452 oracle.control(scheme.public_key()),
453 scheme,
454 connections.remove(0),
455 Consumer::dummy(),
456 prod2,
457 );
458
459 let scheme = schemes.remove(0);
460 let _mailbox3 = setup_and_spawn_actor(
461 &context,
462 oracle.manager(),
463 oracle.control(scheme.public_key()),
464 scheme,
465 connections.remove(0),
466 Consumer::dummy(),
467 prod3,
468 );
469
470 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
472 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
473
474 for _ in 0..10 {
476 mailbox1.fetch(key2.clone()).await;
478 mailbox1.fetch(key3.clone()).await;
479
480 let mut events = Vec::new();
482 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
483 events.push(cons_out1.recv().await.expect("Consumer channel closed"));
484
485 let mut found_key2 = false;
487 let mut found_key3 = false;
488 for event in events {
489 match event {
490 Event::Success(key_actual, value) => {
491 if key_actual == key2 {
492 assert_eq!(value, Bytes::from("data for key 2"));
493 found_key2 = true;
494 } else if key_actual == key3 {
495 assert_eq!(value, Bytes::from("data for key 3"));
496 found_key3 = true;
497 } else {
498 panic!("Unexpected key received");
499 }
500 }
501 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
502 }
503 }
504 assert!(found_key2 && found_key3,);
505 }
506 });
507 }
508
509 #[test_traced]
512 fn test_cancel() {
513 let executor = deterministic::Runner::timed(Duration::from_secs(10));
514 executor.start(|context| async move {
515 let (mut oracle, mut schemes, peers, mut connections) =
516 setup_network_and_peers(&context, &[1, 2]).await;
517
518 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
519
520 let key = Key(6);
521 let mut prod2 = Producer::default();
522 prod2.insert(key.clone(), Bytes::from("data for key 6"));
523
524 let (cons1, mut cons_out1) = Consumer::new();
525
526 let scheme = schemes.remove(0);
527 let mut mailbox1 = setup_and_spawn_actor(
528 &context,
529 oracle.manager(),
530 oracle.control(scheme.public_key()),
531 scheme,
532 connections.remove(0),
533 cons1,
534 Producer::default(),
535 );
536
537 let scheme = schemes.remove(0);
538 let _mailbox2 = setup_and_spawn_actor(
539 &context,
540 oracle.manager(),
541 oracle.control(scheme.public_key()),
542 scheme,
543 connections.remove(0),
544 Consumer::dummy(),
545 prod2,
546 );
547
548 mailbox1.cancel(key.clone()).await;
550 select! {
551 _ = cons_out1.recv() => {
552 panic!("unexpected event");
553 },
554 _ = context.sleep(Duration::from_millis(100)) => {},
555 };
556
557 mailbox1.fetch(key.clone()).await;
559 let event = cons_out1.recv().await.unwrap();
560 match event {
561 Event::Success(key_actual, value) => {
562 assert_eq!(key_actual, key);
563 assert_eq!(value, Bytes::from("data for key 6"));
564 }
565 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
566 }
567
568 mailbox1.cancel(key.clone()).await;
570 select! {
571 _ = cons_out1.recv() => {
572 panic!("unexpected event");
573 },
574 _ = context.sleep(Duration::from_millis(100)) => {},
575 };
576
577 let key = Key(7);
579 mailbox1.fetch(key.clone()).await;
580 mailbox1.cancel(key.clone()).await;
581
582 let event = cons_out1.recv().await.unwrap();
584 match event {
585 Event::Failed(key_actual) => {
586 assert_eq!(key_actual, key);
587 }
588 Event::Success(_, _) => panic!("Fetch should have been canceled"),
589 }
590 });
591 }
592
593 #[test_traced]
596 fn test_blocking_peer() {
597 let executor = deterministic::Runner::timed(Duration::from_secs(10));
598 executor.start(|context| async move {
599 let (mut oracle, mut schemes, peers, mut connections) =
600 setup_network_and_peers(&context, &[1, 2, 3]).await;
601
602 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
603 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
604 add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
605
606 let key_a = Key(1);
607 let key_b = Key(2);
608 let invalid_data_a = Bytes::from("invalid for A");
609 let valid_data_a = Bytes::from("valid for A");
610 let valid_data_b = Bytes::from("valid for B");
611
612 let mut prod2 = Producer::default();
614 prod2.insert(key_a.clone(), invalid_data_a.clone());
615 prod2.insert(key_b.clone(), valid_data_b.clone());
616
617 let mut prod3 = Producer::default();
618 prod3.insert(key_a.clone(), valid_data_a.clone());
619
620 let (mut cons1, mut cons_out1) = Consumer::new();
622 cons1.add_expected(key_a.clone(), valid_data_a.clone());
623 cons1.add_expected(key_b.clone(), valid_data_b.clone());
624
625 let scheme = schemes.remove(0);
627 let mut mailbox1 = setup_and_spawn_actor(
628 &context,
629 oracle.manager(),
630 oracle.control(scheme.public_key()),
631 scheme,
632 connections.remove(0),
633 cons1,
634 Producer::default(),
635 );
636
637 let scheme = schemes.remove(0);
638 let _mailbox2 = setup_and_spawn_actor(
639 &context,
640 oracle.manager(),
641 oracle.control(scheme.public_key()),
642 scheme,
643 connections.remove(0),
644 Consumer::dummy(),
645 prod2,
646 );
647
648 let scheme = schemes.remove(0);
649 let _mailbox3 = setup_and_spawn_actor(
650 &context,
651 oracle.manager(),
652 oracle.control(scheme.public_key()),
653 scheme,
654 connections.remove(0),
655 Consumer::dummy(),
656 prod3,
657 );
658
659 for _ in 0..20 {
661 mailbox1.fetch(key_a.clone()).await;
663
664 let event = cons_out1.recv().await.unwrap();
666 match event {
667 Event::Success(key_actual, value) => {
668 assert_eq!(key_actual, key_a);
669 assert_eq!(value, valid_data_a);
670 }
671 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
672 }
673 }
674
675 mailbox1.fetch(key_b.clone()).await;
677
678 context.sleep(Duration::from_secs(5)).await;
680
681 mailbox1.cancel(key_b.clone()).await;
683
684 let event = cons_out1.recv().await.unwrap();
686 match event {
687 Event::Failed(key_actual) => {
688 assert_eq!(key_actual, key_b);
689 }
690 Event::Success(_, _) => panic!("Fetch should have been canceled"),
691 }
692
693 let blocked = oracle.blocked().await.unwrap();
695 assert_eq!(blocked.len(), 1);
696 assert_eq!(blocked[0].0, peers[0]);
697 assert_eq!(blocked[0].1, peers[1]);
698 });
699 }
700
701 #[test_traced]
705 fn test_duplicate_fetch_request() {
706 let executor = deterministic::Runner::timed(Duration::from_secs(10));
707 executor.start(|context| async move {
708 let (mut oracle, mut schemes, peers, mut connections) =
709 setup_network_and_peers(&context, &[1, 2]).await;
710
711 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
712
713 let key = Key(5);
714 let mut prod2 = Producer::default();
715 prod2.insert(key.clone(), Bytes::from("data for key 5"));
716
717 let (cons1, mut cons_out1) = Consumer::new();
718
719 let scheme = schemes.remove(0);
720 let mut mailbox1 = setup_and_spawn_actor(
721 &context,
722 oracle.manager(),
723 oracle.control(scheme.public_key()),
724 scheme,
725 connections.remove(0),
726 cons1,
727 Producer::default(),
728 );
729
730 let scheme = schemes.remove(0);
731 let _mailbox2 = setup_and_spawn_actor(
732 &context,
733 oracle.manager(),
734 oracle.control(scheme.public_key()),
735 scheme,
736 connections.remove(0),
737 Consumer::dummy(),
738 prod2,
739 );
740
741 mailbox1.fetch(key.clone()).await;
743 mailbox1.fetch(key.clone()).await;
744
745 let event = cons_out1.recv().await.unwrap();
747 match event {
748 Event::Success(key_actual, value) => {
749 assert_eq!(key_actual, key);
750 assert_eq!(value, Bytes::from("data for key 5"));
751 }
752 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
753 }
754
755 select! {
757 _ = cons_out1.recv() => {
758 panic!("Unexpected second event received for duplicate fetch");
759 },
760 _ = context.sleep(Duration::from_millis(500)) => {
761 },
763 };
764 });
765 }
766
767 #[test_traced]
771 fn test_changing_peer_sets() {
772 let executor = deterministic::Runner::timed(Duration::from_secs(10));
773 executor.start(|context| async move {
774 let (mut oracle, mut schemes, peers, mut connections) =
775 setup_network_and_peers(&context, &[1, 2, 3]).await;
776
777 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
778 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
779
780 let key1 = Key(1);
781 let key2 = Key(2);
782
783 let mut prod2 = Producer::default();
784 prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
785
786 let mut prod3 = Producer::default();
787 prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
788
789 let (cons1, mut cons_out1) = Consumer::new();
790
791 let scheme = schemes.remove(0);
792 let mut mailbox1 = setup_and_spawn_actor(
793 &context,
794 oracle.manager(),
795 oracle.control(scheme.public_key()),
796 scheme,
797 connections.remove(0),
798 cons1,
799 Producer::default(),
800 );
801
802 let scheme = schemes.remove(0);
803 let _mailbox2 = setup_and_spawn_actor(
804 &context,
805 oracle.manager(),
806 oracle.control(scheme.public_key()),
807 scheme,
808 connections.remove(0),
809 Consumer::dummy(),
810 prod2,
811 );
812
813 mailbox1.fetch(key1.clone()).await;
815
816 let event = cons_out1.recv().await.unwrap();
818 match event {
819 Event::Success(key_actual, value) => {
820 assert_eq!(key_actual, key1);
821 assert_eq!(value, Bytes::from("data from peer 2"));
822 }
823 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
824 }
825
826 let scheme = schemes.remove(0);
828 let _mailbox3 = setup_and_spawn_actor(
829 &context,
830 oracle.manager(),
831 oracle.control(scheme.public_key()),
832 scheme,
833 connections.remove(0),
834 Consumer::dummy(),
835 prod3,
836 );
837
838 context.sleep(Duration::from_millis(200)).await;
840
841 mailbox1.fetch(key2.clone()).await;
843
844 let event = cons_out1.recv().await.unwrap();
846 match event {
847 Event::Success(key_actual, value) => {
848 assert_eq!(key_actual, key2);
849 assert_eq!(value, Bytes::from("data from peer 3"));
850 }
851 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
852 }
853 });
854 }
855
856 #[test_traced]
857 fn test_fetch_targeted() {
858 let executor = deterministic::Runner::timed(Duration::from_secs(10));
859 executor.start(|context| async move {
860 let (mut oracle, mut schemes, peers, mut connections) =
861 setup_network_and_peers(&context, &[1, 2, 3]).await;
862
863 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
864 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
865
866 let key = Key(1);
867 let invalid_data = Bytes::from("invalid data");
868 let valid_data = Bytes::from("valid data");
869
870 let mut prod2 = Producer::default();
872 prod2.insert(key.clone(), invalid_data.clone());
873
874 let mut prod3 = Producer::default();
875 prod3.insert(key.clone(), valid_data.clone());
876
877 let (mut cons1, mut cons_out1) = Consumer::new();
879 cons1.add_expected(key.clone(), valid_data.clone());
880
881 let scheme = schemes.remove(0);
882 let mut mailbox1 = setup_and_spawn_actor(
883 &context,
884 oracle.manager(),
885 oracle.control(scheme.public_key()),
886 scheme,
887 connections.remove(0),
888 cons1,
889 Producer::default(),
890 );
891
892 let scheme = schemes.remove(0);
893 let _mailbox2 = setup_and_spawn_actor(
894 &context,
895 oracle.manager(),
896 oracle.control(scheme.public_key()),
897 scheme,
898 connections.remove(0),
899 Consumer::dummy(),
900 prod2,
901 );
902
903 let scheme = schemes.remove(0);
904 let _mailbox3 = setup_and_spawn_actor(
905 &context,
906 oracle.manager(),
907 oracle.control(scheme.public_key()),
908 scheme,
909 connections.remove(0),
910 Consumer::dummy(),
911 prod3,
912 );
913
914 context.sleep(Duration::from_millis(100)).await;
916
917 mailbox1
921 .fetch_targeted(
922 key.clone(),
923 non_empty_vec![peers[1].clone(), peers[2].clone()],
924 )
925 .await;
926
927 let event = cons_out1.recv().await.unwrap();
929 match event {
930 Event::Success(key_actual, value) => {
931 assert_eq!(key_actual, key);
932 assert_eq!(value, valid_data);
933 }
934 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
935 }
936
937 let blocked = oracle.blocked().await.unwrap();
939 assert_eq!(blocked.len(), 1);
940 assert_eq!(blocked[0].0, peers[0]);
941 assert_eq!(blocked[0].1, peers[1]);
942
943 let metrics = context.encode();
945 assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
946 });
947 }
948
949 #[test_traced]
950 fn test_fetch_targeted_no_fallback() {
951 let executor = deterministic::Runner::timed(Duration::from_secs(10));
952 executor.start(|context| async move {
953 let (mut oracle, mut schemes, peers, mut connections) =
954 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
955
956 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
957 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
958 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
959
960 let key = Key(1);
961
962 let mut prod4 = Producer::default();
964 prod4.insert(key.clone(), Bytes::from("data from peer 4"));
965
966 let (cons1, mut cons_out1) = Consumer::new();
967
968 let scheme = schemes.remove(0);
969 let mut mailbox1 = setup_and_spawn_actor(
970 &context,
971 oracle.manager(),
972 oracle.control(scheme.public_key()),
973 scheme,
974 connections.remove(0),
975 cons1,
976 Producer::default(),
977 );
978
979 let scheme = schemes.remove(0);
980 let _mailbox2 = setup_and_spawn_actor(
981 &context,
982 oracle.manager(),
983 oracle.control(scheme.public_key()),
984 scheme,
985 connections.remove(0),
986 Consumer::dummy(),
987 Producer::default(), );
989
990 let scheme = schemes.remove(0);
991 let _mailbox3 = setup_and_spawn_actor(
992 &context,
993 oracle.manager(),
994 oracle.control(scheme.public_key()),
995 scheme,
996 connections.remove(0),
997 Consumer::dummy(),
998 Producer::default(), );
1000
1001 let scheme = schemes.remove(0);
1002 let _mailbox4 = setup_and_spawn_actor(
1003 &context,
1004 oracle.manager(),
1005 oracle.control(scheme.public_key()),
1006 scheme,
1007 connections.remove(0),
1008 Consumer::dummy(),
1009 prod4,
1010 );
1011
1012 context.sleep(Duration::from_millis(100)).await;
1014
1015 mailbox1
1018 .fetch_targeted(
1019 key.clone(),
1020 non_empty_vec![peers[1].clone(), peers[2].clone()],
1021 )
1022 .await;
1023
1024 select! {
1027 event = cons_out1.recv() => {
1028 panic!("Fetch should not succeed, but got: {event:?}");
1029 },
1030 _ = context.sleep(Duration::from_secs(3)) => {
1031 },
1033 };
1034 });
1035 }
1036
1037 #[test_traced]
1038 fn test_fetch_all_targeted() {
1039 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1040 executor.start(|context| async move {
1041 let (mut oracle, mut schemes, peers, mut connections) =
1042 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1043
1044 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1045 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1046 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1047
1048 let key1 = Key(1);
1049 let key2 = Key(2);
1050 let key3 = Key(3);
1051
1052 let mut prod2 = Producer::default();
1054 prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1055
1056 let mut prod3 = Producer::default();
1058 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1059
1060 let mut prod4 = Producer::default();
1062 prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1063
1064 let (mut cons1, mut cons_out1) = Consumer::new();
1066 cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1067 cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1068 cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1069
1070 let scheme = schemes.remove(0);
1071 let mut mailbox1 = setup_and_spawn_actor(
1072 &context,
1073 oracle.manager(),
1074 oracle.control(scheme.public_key()),
1075 scheme,
1076 connections.remove(0),
1077 cons1,
1078 Producer::default(),
1079 );
1080
1081 let scheme = schemes.remove(0);
1082 let _mailbox2 = setup_and_spawn_actor(
1083 &context,
1084 oracle.manager(),
1085 oracle.control(scheme.public_key()),
1086 scheme,
1087 connections.remove(0),
1088 Consumer::dummy(),
1089 prod2,
1090 );
1091
1092 let scheme = schemes.remove(0);
1093 let _mailbox3 = setup_and_spawn_actor(
1094 &context,
1095 oracle.manager(),
1096 oracle.control(scheme.public_key()),
1097 scheme,
1098 connections.remove(0),
1099 Consumer::dummy(),
1100 prod3,
1101 );
1102
1103 let scheme = schemes.remove(0);
1104 let _mailbox4 = setup_and_spawn_actor(
1105 &context,
1106 oracle.manager(),
1107 oracle.control(scheme.public_key()),
1108 scheme,
1109 connections.remove(0),
1110 Consumer::dummy(),
1111 prod4,
1112 );
1113
1114 context.sleep(Duration::from_millis(100)).await;
1116
1117 mailbox1
1122 .fetch_all_targeted(vec![
1123 (key1.clone(), non_empty_vec![peers[1].clone()]), (key2.clone(), non_empty_vec![peers[3].clone()]), ])
1126 .await;
1127 mailbox1.fetch(key3.clone()).await; let mut results = HashMap::new();
1131 for _ in 0..3 {
1132 let event = cons_out1.recv().await.unwrap();
1133 match event {
1134 Event::Success(key, value) => {
1135 results.insert(key, value);
1136 }
1137 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1138 }
1139 }
1140
1141 assert_eq!(results.len(), 3);
1143 assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1144 assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1145 assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1146
1147 let metrics = context.encode();
1149 assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1150 });
1151 }
1152
1153 #[test_traced]
1156 fn test_fetch_clears_targets() {
1157 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1158 executor.start(|context| async move {
1159 let (mut oracle, mut schemes, peers, mut connections) =
1160 setup_network_and_peers(&context, &[1, 2, 3]).await;
1161
1162 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1163 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1164
1165 let key = Key(1);
1166 let valid_data = Bytes::from("valid data");
1167
1168 let mut prod3 = Producer::default();
1170 prod3.insert(key.clone(), valid_data.clone());
1171
1172 let (cons1, mut cons_out1) = Consumer::new();
1173
1174 let scheme = schemes.remove(0);
1175 let mut mailbox1 = setup_and_spawn_actor(
1176 &context,
1177 oracle.manager(),
1178 oracle.control(scheme.public_key()),
1179 scheme,
1180 connections.remove(0),
1181 cons1,
1182 Producer::default(),
1183 );
1184
1185 let scheme = schemes.remove(0);
1186 let _mailbox2 = setup_and_spawn_actor(
1187 &context,
1188 oracle.manager(),
1189 oracle.control(scheme.public_key()),
1190 scheme,
1191 connections.remove(0),
1192 Consumer::dummy(),
1193 Producer::default(), );
1195
1196 let scheme = schemes.remove(0);
1197 let _mailbox3 = setup_and_spawn_actor(
1198 &context,
1199 oracle.manager(),
1200 oracle.control(scheme.public_key()),
1201 scheme,
1202 connections.remove(0),
1203 Consumer::dummy(),
1204 prod3,
1205 );
1206
1207 context.sleep(Duration::from_millis(100)).await;
1209
1210 mailbox1
1212 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1213 .await;
1214
1215 context.sleep(Duration::from_millis(500)).await;
1217
1218 mailbox1.fetch(key.clone()).await;
1220
1221 let event = cons_out1.recv().await.unwrap();
1223 match event {
1224 Event::Success(key_actual, value) => {
1225 assert_eq!(key_actual, key);
1226 assert_eq!(value, valid_data);
1227 }
1228 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1229 }
1230 });
1231 }
1232
1233 #[test_traced]
1234 fn test_fetch_targeted_does_not_restrict_all() {
1235 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1236 executor.start(|context| async move {
1237 let (mut oracle, mut schemes, peers, mut connections) =
1238 setup_network_and_peers(&context, &[1, 2, 3]).await;
1239
1240 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1241 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1242
1243 let key = Key(1);
1244 let valid_data = Bytes::from("valid data");
1245
1246 let mut prod3 = Producer::default();
1248 prod3.insert(key.clone(), valid_data.clone());
1249
1250 let (cons1, mut cons_out1) = Consumer::new();
1251
1252 let scheme = schemes.remove(0);
1253 let mut mailbox1 = setup_and_spawn_actor(
1254 &context,
1255 oracle.manager(),
1256 oracle.control(scheme.public_key()),
1257 scheme,
1258 connections.remove(0),
1259 cons1,
1260 Producer::default(),
1261 );
1262
1263 let scheme = schemes.remove(0);
1264 let _mailbox2 = setup_and_spawn_actor(
1265 &context,
1266 oracle.manager(),
1267 oracle.control(scheme.public_key()),
1268 scheme,
1269 connections.remove(0),
1270 Consumer::dummy(),
1271 Producer::default(), );
1273
1274 let scheme = schemes.remove(0);
1275 let _mailbox3 = setup_and_spawn_actor(
1276 &context,
1277 oracle.manager(),
1278 oracle.control(scheme.public_key()),
1279 scheme,
1280 connections.remove(0),
1281 Consumer::dummy(),
1282 prod3,
1283 );
1284
1285 context.sleep(Duration::from_millis(100)).await;
1287
1288 mailbox1.fetch(key.clone()).await;
1290
1291 context.sleep(Duration::from_millis(50)).await;
1293
1294 mailbox1
1297 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1298 .await;
1299
1300 let event = cons_out1.recv().await.unwrap();
1303 match event {
1304 Event::Success(key_actual, value) => {
1305 assert_eq!(key_actual, key);
1306 assert_eq!(value, valid_data);
1307 }
1308 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1309 }
1310 });
1311 }
1312
1313 #[test_traced]
1314 fn test_retain() {
1315 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1316 executor.start(|context| async move {
1317 let (mut oracle, mut schemes, peers, mut connections) =
1318 setup_network_and_peers(&context, &[1, 2]).await;
1319
1320 let key = Key(5);
1321 let mut prod2 = Producer::default();
1322 prod2.insert(key.clone(), Bytes::from("data for key 5"));
1323
1324 let (cons1, mut cons_out1) = Consumer::new();
1325
1326 let scheme = schemes.remove(0);
1327 let mut mailbox1 = setup_and_spawn_actor(
1328 &context,
1329 oracle.manager(),
1330 oracle.control(scheme.public_key()),
1331 scheme,
1332 connections.remove(0),
1333 cons1,
1334 Producer::default(),
1335 );
1336
1337 let scheme = schemes.remove(0);
1338 let _mailbox2 = setup_and_spawn_actor(
1339 &context,
1340 oracle.manager(),
1341 oracle.control(scheme.public_key()),
1342 scheme,
1343 connections.remove(0),
1344 Consumer::dummy(),
1345 prod2,
1346 );
1347
1348 mailbox1.retain(|_| true).await;
1350 select! {
1351 _ = cons_out1.recv() => {
1352 panic!("unexpected event");
1353 },
1354 _ = context.sleep(Duration::from_millis(100)) => {},
1355 };
1356
1357 mailbox1.fetch(key.clone()).await;
1359
1360 let key_clone = key.clone();
1363 mailbox1.retain(move |k| k != &key_clone).await;
1364
1365 let event = cons_out1.recv().await.unwrap();
1367 match event {
1368 Event::Failed(key_actual) => {
1369 assert_eq!(key_actual, key);
1370 }
1371 Event::Success(_, _) => panic!("Fetch should have been retained out"),
1372 }
1373
1374 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1376
1377 mailbox1.fetch(key.clone()).await;
1380
1381 let event = cons_out1.recv().await.unwrap();
1383 match event {
1384 Event::Success(key_actual, value) => {
1385 assert_eq!(key_actual, key);
1386 assert_eq!(value, Bytes::from("data for key 5"));
1387 }
1388 Event::Failed(_) => unreachable!(),
1389 }
1390 });
1391 }
1392
1393 #[test_traced]
1394 fn test_clear() {
1395 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1396 executor.start(|context| async move {
1397 let (mut oracle, mut schemes, peers, mut connections) =
1398 setup_network_and_peers(&context, &[1, 2]).await;
1399
1400 let key = Key(6);
1402 let mut prod2 = Producer::default();
1403 prod2.insert(key.clone(), Bytes::from("data for key 6"));
1404
1405 let (cons1, mut cons_out1) = Consumer::new();
1406
1407 let scheme = schemes.remove(0);
1408 let mut mailbox1 = setup_and_spawn_actor(
1409 &context,
1410 oracle.manager(),
1411 oracle.control(scheme.public_key()),
1412 scheme,
1413 connections.remove(0),
1414 cons1,
1415 Producer::default(),
1416 );
1417
1418 let scheme = schemes.remove(0);
1419 let _mailbox2 = setup_and_spawn_actor(
1420 &context,
1421 oracle.manager(),
1422 oracle.control(scheme.public_key()),
1423 scheme,
1424 connections.remove(0),
1425 Consumer::dummy(),
1426 prod2,
1427 );
1428
1429 mailbox1.clear().await;
1431 select! {
1432 _ = cons_out1.recv() => {
1433 panic!("unexpected event");
1434 },
1435 _ = context.sleep(Duration::from_millis(100)) => {},
1436 };
1437
1438 mailbox1.fetch(key.clone()).await;
1440
1441 mailbox1.clear().await;
1443
1444 let event = cons_out1.recv().await.unwrap();
1446 match event {
1447 Event::Failed(key_actual) => {
1448 assert_eq!(key_actual, key);
1449 }
1450 Event::Success(_, _) => panic!("Fetch should have been cleared"),
1451 }
1452
1453 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1455
1456 mailbox1.fetch(key.clone()).await;
1459
1460 let event = cons_out1.recv().await.unwrap();
1462 match event {
1463 Event::Success(key_actual, value) => {
1464 assert_eq!(key_actual, key);
1465 assert_eq!(value, Bytes::from("data for key 6"));
1466 }
1467 Event::Failed(_) => unreachable!(),
1468 }
1469 });
1470 }
1471
1472 #[test_traced]
1476 fn test_rate_limit_spillover() {
1477 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1478 executor.start(|context| async move {
1479 let (mut oracle, mut schemes, peers, mut connections) =
1481 setup_network_and_peers_with_rate_limit(
1482 &context,
1483 &[1, 2, 3],
1484 Quota::per_second(NZU32!(1)),
1485 )
1486 .await;
1487
1488 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1490 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1491
1492 let mut prod2 = Producer::default();
1494 let mut prod3 = Producer::default();
1495 prod2.insert(Key(0), Bytes::from("data for key 0"));
1496 prod2.insert(Key(1), Bytes::from("data for key 1"));
1497 prod3.insert(Key(0), Bytes::from("data for key 0"));
1498 prod3.insert(Key(1), Bytes::from("data for key 1"));
1499
1500 let (cons1, mut cons_out1) = Consumer::new();
1501
1502 let scheme = schemes.remove(0);
1504 let mut mailbox1 = setup_and_spawn_actor(
1505 &context,
1506 oracle.manager(),
1507 oracle.control(scheme.public_key()),
1508 scheme,
1509 connections.remove(0),
1510 cons1,
1511 Producer::default(),
1512 );
1513
1514 let scheme = schemes.remove(0);
1516 let _mailbox2 = setup_and_spawn_actor(
1517 &context,
1518 oracle.manager(),
1519 oracle.control(scheme.public_key()),
1520 scheme,
1521 connections.remove(0),
1522 Consumer::dummy(),
1523 prod2,
1524 );
1525
1526 let scheme = schemes.remove(0);
1528 let _mailbox3 = setup_and_spawn_actor(
1529 &context,
1530 oracle.manager(),
1531 oracle.control(scheme.public_key()),
1532 scheme,
1533 connections.remove(0),
1534 Consumer::dummy(),
1535 prod3,
1536 );
1537
1538 context.sleep(Duration::from_millis(100)).await;
1540 let start = context.current();
1541
1542 mailbox1.fetch(Key(0)).await;
1546 mailbox1.fetch(Key(1)).await;
1547
1548 let mut results = HashMap::new();
1550 for _ in 0..2 {
1551 let event = cons_out1.recv().await.unwrap();
1552 match event {
1553 Event::Success(key, value) => {
1554 results.insert(key.clone(), value);
1555 }
1556 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1557 }
1558 }
1559
1560 assert_eq!(results.len(), 2);
1562 assert_eq!(
1563 results.get(&Key(0)).unwrap(),
1564 &Bytes::from("data for key 0")
1565 );
1566 assert_eq!(
1567 results.get(&Key(1)).unwrap(),
1568 &Bytes::from("data for key 1")
1569 );
1570
1571 let elapsed = context.current().duration_since(start).unwrap();
1574 assert!(
1575 elapsed < Duration::from_millis(500),
1576 "Expected quick completion via spill-over, but took {elapsed:?}"
1577 );
1578 });
1579 }
1580
1581 #[test_traced]
1585 fn test_rate_limit_retry_after_reset() {
1586 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1587 executor.start(|context| async move {
1588 let (mut oracle, mut schemes, peers, mut connections) =
1590 setup_network_and_peers_with_rate_limit(
1591 &context,
1592 &[1, 2],
1593 Quota::per_second(NZU32!(1)),
1594 )
1595 .await;
1596
1597 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1598
1599 let mut prod2 = Producer::default();
1601 prod2.insert(Key(1), Bytes::from("data for key 1"));
1602 prod2.insert(Key(2), Bytes::from("data for key 2"));
1603 prod2.insert(Key(3), Bytes::from("data for key 3"));
1604
1605 let (cons1, mut cons_out1) = Consumer::new();
1606
1607 let scheme = schemes.remove(0);
1608 let mut mailbox1 = setup_and_spawn_actor(
1609 &context,
1610 oracle.manager(),
1611 oracle.control(scheme.public_key()),
1612 scheme,
1613 connections.remove(0),
1614 cons1,
1615 Producer::default(),
1616 );
1617
1618 let scheme = schemes.remove(0);
1619 let _mailbox2 = setup_and_spawn_actor(
1620 &context,
1621 oracle.manager(),
1622 oracle.control(scheme.public_key()),
1623 scheme,
1624 connections.remove(0),
1625 Consumer::dummy(),
1626 prod2,
1627 );
1628
1629 context.sleep(Duration::from_millis(100)).await;
1631 let start = context.current();
1632
1633 mailbox1.fetch(Key(1)).await;
1636 mailbox1.fetch(Key(2)).await;
1637 mailbox1.fetch(Key(3)).await;
1638
1639 let mut results = HashMap::new();
1641 for _ in 0..3 {
1642 let event = cons_out1.recv().await.unwrap();
1643 match event {
1644 Event::Success(key, value) => {
1645 results.insert(key.clone(), value);
1646 }
1647 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1648 }
1649 }
1650
1651 assert_eq!(results.len(), 3);
1652 for i in 1..=3 {
1653 assert_eq!(
1654 results.get(&Key(i)).unwrap(),
1655 &Bytes::from(format!("data for key {}", i))
1656 );
1657 }
1658
1659 let elapsed = context.current().duration_since(start).unwrap();
1663 assert!(
1664 elapsed > Duration::from_secs(2),
1665 "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1666 );
1667 });
1668 }
1669
1670 #[test_traced]
1674 fn test_self_exclusion() {
1675 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1676 executor.start(|context| async move {
1677 let (mut oracle, mut schemes, peers, mut connections) =
1678 setup_network_and_peers(&context, &[1, 2]).await;
1679
1680 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1681
1682 let key = Key(1);
1683 let data = Bytes::from("shared data");
1684
1685 let mut prod1 = Producer::default();
1687 prod1.insert(key.clone(), data.clone());
1688 let mut prod2 = Producer::default();
1689 prod2.insert(key.clone(), data.clone());
1690
1691 let (cons1, mut cons_out1) = Consumer::new();
1692
1693 let scheme = schemes.remove(0);
1695 let mut mailbox1 = setup_and_spawn_actor(
1696 &context,
1697 oracle.manager(),
1698 oracle.control(scheme.public_key()),
1699 scheme,
1700 connections.remove(0),
1701 cons1,
1702 prod1, );
1704
1705 let scheme = schemes.remove(0);
1707 let _mailbox2 = setup_and_spawn_actor(
1708 &context,
1709 oracle.manager(),
1710 oracle.control(scheme.public_key()),
1711 scheme,
1712 connections.remove(0),
1713 Consumer::dummy(),
1714 prod2,
1715 );
1716
1717 context.sleep(Duration::from_millis(100)).await;
1719
1720 mailbox1.fetch(key.clone()).await;
1722
1723 let event = cons_out1.recv().await.unwrap();
1725 match event {
1726 Event::Success(key_actual, value) => {
1727 assert_eq!(key_actual, key);
1728 assert_eq!(value, data);
1729 }
1730 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1731 }
1732 });
1733 }
1734
1735 #[allow(clippy::type_complexity)]
1736 fn spawn_actors_with_handles(
1737 context: deterministic::Context,
1738 oracle: &Oracle<PublicKey, deterministic::Context>,
1739 schemes: Vec<PrivateKey>,
1740 connections: Vec<(
1741 Sender<PublicKey, deterministic::Context>,
1742 Receiver<PublicKey>,
1743 )>,
1744 consumers: Vec<Consumer<Key, Bytes>>,
1745 producers: Vec<Producer<Key, Bytes>>,
1746 ) -> (
1747 Vec<Mailbox<Key, PublicKey>>,
1748 Vec<commonware_runtime::Handle<()>>,
1749 ) {
1750 let actor_context = context.with_label("actor");
1751 let mut mailboxes = Vec::new();
1752 let mut handles = Vec::new();
1753
1754 for (idx, ((scheme, conn), (consumer, producer))) in schemes
1755 .into_iter()
1756 .zip(connections)
1757 .zip(consumers.into_iter().zip(producers))
1758 .enumerate()
1759 {
1760 let ctx = actor_context.with_label(&format!("peer_{idx}"));
1761 let public_key = scheme.public_key();
1762 let (engine, mailbox) = Engine::new(
1763 ctx,
1764 Config {
1765 peer_provider: oracle.manager(),
1766 blocker: oracle.control(public_key.clone()),
1767 consumer,
1768 producer,
1769 mailbox_size: MAILBOX_SIZE,
1770 me: Some(public_key),
1771 initial: INITIAL_DURATION,
1772 timeout: TIMEOUT,
1773 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
1774 priority_requests: false,
1775 priority_responses: false,
1776 },
1777 );
1778 handles.push(engine.start(conn));
1779 mailboxes.push(mailbox);
1780 }
1781
1782 (mailboxes, handles)
1783 }
1784
1785 #[test_traced]
1786 fn test_operations_after_shutdown_do_not_panic() {
1787 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1788 executor.start(|context| async move {
1789 let (mut oracle, schemes, peers, connections) =
1790 setup_network_and_peers(&context, &[1, 2]).await;
1791
1792 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1793
1794 let key = Key(1);
1795 let mut prod2 = Producer::default();
1796 prod2.insert(key.clone(), Bytes::from("data for key 1"));
1797
1798 let (cons1, mut cons_out1) = Consumer::new();
1799
1800 let (mut mailboxes, handles) = spawn_actors_with_handles(
1801 context.clone(),
1802 &oracle,
1803 schemes,
1804 connections,
1805 vec![cons1, Consumer::dummy()],
1806 vec![Producer::default(), prod2],
1807 );
1808
1809 mailboxes[0].fetch(key.clone()).await;
1811 let event = cons_out1.recv().await.unwrap();
1812 match event {
1813 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1814 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1815 }
1816
1817 for handle in handles {
1819 handle.abort();
1820 }
1821 context.sleep(Duration::from_millis(100)).await;
1822
1823 let key2 = Key(2);
1827 mailboxes[0].fetch(key2.clone()).await;
1828
1829 mailboxes[0].cancel(key2.clone()).await;
1831
1832 mailboxes[0].clear().await;
1834
1835 mailboxes[0].retain(|_| true).await;
1837
1838 mailboxes[0]
1840 .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
1841 .await;
1842 });
1843 }
1844
1845 fn clean_shutdown(seed: u64) {
1846 let cfg = deterministic::Config::default()
1847 .with_seed(seed)
1848 .with_timeout(Some(Duration::from_secs(30)));
1849 let executor = deterministic::Runner::new(cfg);
1850 executor.start(|context| async move {
1851 let (mut oracle, schemes, peers, connections) =
1852 setup_network_and_peers(&context, &[1, 2]).await;
1853
1854 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1855
1856 let key = Key(1);
1857 let mut prod2 = Producer::default();
1858 prod2.insert(key.clone(), Bytes::from("data for key 1"));
1859
1860 let (cons1, mut cons_out1) = Consumer::new();
1861
1862 let (mut mailboxes, handles) = spawn_actors_with_handles(
1863 context.clone(),
1864 &oracle,
1865 schemes,
1866 connections,
1867 vec![cons1, Consumer::dummy()],
1868 vec![Producer::default(), prod2],
1869 );
1870
1871 context.sleep(Duration::from_millis(100)).await;
1873
1874 let running_before = count_running_tasks(&context, "actor");
1876 assert!(
1877 running_before > 0,
1878 "at least one actor task should be running"
1879 );
1880
1881 mailboxes[0].fetch(key.clone()).await;
1883 let event = cons_out1.recv().await.unwrap();
1884 match event {
1885 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1886 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1887 }
1888
1889 for handle in handles {
1891 handle.abort();
1892 }
1893 context.sleep(Duration::from_millis(100)).await;
1894
1895 let running_after = count_running_tasks(&context, "actor");
1897 assert_eq!(
1898 running_after, 0,
1899 "all actor tasks should be stopped, but {running_after} still running"
1900 );
1901 });
1902 }
1903
1904 #[test]
1905 fn test_clean_shutdown() {
1906 for seed in 0..25 {
1907 clean_shutdown(seed);
1908 }
1909 }
1910}