1use bytes::Bytes;
44use commonware_utils::Span;
45use futures::channel::oneshot;
46use std::future::Future;
47
48mod config;
49pub use config::Config;
50mod engine;
51pub use engine::Engine;
52mod fetcher;
53mod ingress;
54pub use ingress::Mailbox;
55mod metrics;
56mod wire;
57
58#[cfg(feature = "mocks")]
59pub mod mocks;
60
61pub trait Producer: Clone + Send + 'static {
63 type Key: Span;
65
66 fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
68}
69
70#[cfg(test)]
71mod tests {
72 use super::{
73 mocks::{Consumer, Event, Key, Producer},
74 Config, Engine, Mailbox,
75 };
76 use crate::Resolver;
77 use bytes::Bytes;
78 use commonware_cryptography::{
79 ed25519::{PrivateKey, PublicKey},
80 Signer,
81 };
82 use commonware_macros::{select, test_traced};
83 use commonware_p2p::{
84 simulated::{Link, Network, Oracle, Receiver, Sender},
85 Blocker, Manager,
86 };
87 use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
88 use commonware_utils::{non_empty_vec, NZU32};
89 use futures::StreamExt;
90 use std::{collections::HashMap, num::NonZeroU32, time::Duration};
91
92 const MAILBOX_SIZE: usize = 1024;
93 const RATE_LIMIT: NonZeroU32 = NZU32!(10);
94 const INITIAL_DURATION: Duration = Duration::from_millis(100);
95 const TIMEOUT: Duration = Duration::from_millis(400);
96 const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
97 const LINK: Link = Link {
98 latency: Duration::from_millis(10),
99 jitter: Duration::from_millis(1),
100 success_rate: 1.0,
101 };
102 const LINK_UNRELIABLE: Link = Link {
103 latency: Duration::from_millis(10),
104 jitter: Duration::from_millis(1),
105 success_rate: 0.5,
106 };
107
108 async fn setup_network_and_peers(
109 context: &deterministic::Context,
110 peer_seeds: &[u64],
111 ) -> (
112 Oracle<PublicKey, deterministic::Context>,
113 Vec<PrivateKey>,
114 Vec<PublicKey>,
115 Vec<(
116 Sender<PublicKey, deterministic::Context>,
117 Receiver<PublicKey>,
118 )>,
119 ) {
120 setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
121 .await
122 }
123
124 async fn setup_network_and_peers_with_rate_limit(
125 context: &deterministic::Context,
126 peer_seeds: &[u64],
127 rate_limit: Quota,
128 ) -> (
129 Oracle<PublicKey, deterministic::Context>,
130 Vec<PrivateKey>,
131 Vec<PublicKey>,
132 Vec<(
133 Sender<PublicKey, deterministic::Context>,
134 Receiver<PublicKey>,
135 )>,
136 ) {
137 let (network, oracle) = Network::new(
138 context.with_label("network"),
139 commonware_p2p::simulated::Config {
140 max_size: 1024 * 1024,
141 disconnect_on_block: true,
142 tracked_peer_sets: Some(3),
143 },
144 );
145 network.start();
146
147 let schemes: Vec<PrivateKey> = peer_seeds
148 .iter()
149 .map(|seed| PrivateKey::from_seed(*seed))
150 .collect();
151 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
152 let mut manager = oracle.manager();
153 manager.update(0, peers.clone().try_into().unwrap()).await;
154
155 let mut connections = Vec::new();
156 for peer in &peers {
157 let (sender, receiver) = oracle
158 .control(peer.clone())
159 .register(0, rate_limit)
160 .await
161 .unwrap();
162 connections.push((sender, receiver));
163 }
164
165 (oracle, schemes, peers, connections)
166 }
167
168 async fn add_link(
169 oracle: &mut Oracle<PublicKey, deterministic::Context>,
170 link: Link,
171 peers: &[PublicKey],
172 from: usize,
173 to: usize,
174 ) {
175 oracle
176 .add_link(peers[from].clone(), peers[to].clone(), link.clone())
177 .await
178 .unwrap();
179 oracle
180 .add_link(peers[to].clone(), peers[from].clone(), link)
181 .await
182 .unwrap();
183 }
184
185 async fn setup_and_spawn_actor(
186 context: &deterministic::Context,
187 manager: impl Manager<PublicKey = PublicKey>,
188 blocker: impl Blocker<PublicKey = PublicKey>,
189 signer: impl Signer<PublicKey = PublicKey>,
190 connection: (
191 Sender<PublicKey, deterministic::Context>,
192 Receiver<PublicKey>,
193 ),
194 consumer: Consumer<Key, Bytes>,
195 producer: Producer<Key, Bytes>,
196 ) -> Mailbox<Key, PublicKey> {
197 let public_key = signer.public_key();
198 let (engine, mailbox) = Engine::new(
199 context.with_label(&format!("actor_{public_key}")),
200 Config {
201 manager,
202 blocker,
203 consumer,
204 producer,
205 mailbox_size: MAILBOX_SIZE,
206 me: Some(public_key),
207 initial: INITIAL_DURATION,
208 timeout: TIMEOUT,
209 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
210 priority_requests: false,
211 priority_responses: false,
212 },
213 );
214 engine.start(connection);
215
216 mailbox
217 }
218
219 #[test_traced]
223 fn test_fetch_success() {
224 let executor = deterministic::Runner::timed(Duration::from_secs(10));
225 executor.start(|context| async move {
226 let (mut oracle, mut schemes, peers, mut connections) =
227 setup_network_and_peers(&context, &[1, 2]).await;
228
229 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
230
231 let key = Key(2);
232 let mut prod2 = Producer::default();
233 prod2.insert(key.clone(), Bytes::from("data for key 2"));
234
235 let (cons1, mut cons_out1) = Consumer::new();
236
237 let scheme = schemes.remove(0);
238 let mut mailbox1 = setup_and_spawn_actor(
239 &context,
240 oracle.manager(),
241 oracle.control(scheme.public_key()),
242 scheme,
243 connections.remove(0),
244 cons1,
245 Producer::default(),
246 )
247 .await;
248
249 let scheme = schemes.remove(0);
250 let _mailbox2 = setup_and_spawn_actor(
251 &context,
252 oracle.manager(),
253 oracle.control(scheme.public_key()),
254 scheme,
255 connections.remove(0),
256 Consumer::dummy(),
257 prod2,
258 )
259 .await;
260
261 mailbox1.fetch(key.clone()).await;
262
263 let event = cons_out1.next().await.unwrap();
264 match event {
265 Event::Success(key_actual, value) => {
266 assert_eq!(key_actual, key);
267 assert_eq!(value, Bytes::from("data for key 2"));
268 }
269 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
270 }
271 });
272 }
273
274 #[test_traced]
278 fn test_cancel_fetch() {
279 let executor = deterministic::Runner::timed(Duration::from_secs(10));
280 executor.start(|context| async move {
281 let (oracle, mut schemes, _peers, mut connections) =
282 setup_network_and_peers(&context, &[1]).await;
283
284 let (cons1, mut cons_out1) = Consumer::new();
285 let prod1 = Producer::default();
286
287 let scheme = schemes.remove(0);
288 let mut mailbox1 = setup_and_spawn_actor(
289 &context,
290 oracle.manager(),
291 oracle.control(scheme.public_key()),
292 scheme,
293 connections.remove(0),
294 cons1,
295 prod1,
296 )
297 .await;
298
299 let key = Key(3);
300 mailbox1.fetch(key.clone()).await;
301 mailbox1.cancel(key.clone()).await;
302
303 let event = cons_out1.next().await.unwrap();
304 match event {
305 Event::Failed(key_actual) => {
306 assert_eq!(key_actual, key);
307 }
308 Event::Success(_, _) => panic!("Fetch should have been canceled"),
309 }
310 });
311 }
312
313 #[test_traced]
318 fn test_peer_no_data() {
319 let executor = deterministic::Runner::timed(Duration::from_secs(10));
320 executor.start(|context| async move {
321 let (mut oracle, mut schemes, peers, mut connections) =
322 setup_network_and_peers(&context, &[1, 2, 3]).await;
323
324 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
325 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
326
327 let prod1 = Producer::default();
328 let prod2 = Producer::default();
329 let mut prod3 = Producer::default();
330 let key = Key(3);
331 prod3.insert(key.clone(), Bytes::from("data for key 3"));
332
333 let (cons1, mut cons_out1) = Consumer::new();
334
335 let scheme = schemes.remove(0);
336 let mut mailbox1 = setup_and_spawn_actor(
337 &context,
338 oracle.manager(),
339 oracle.control(scheme.public_key()),
340 scheme,
341 connections.remove(0),
342 cons1,
343 prod1,
344 )
345 .await;
346
347 let scheme = schemes.remove(0);
348 let _mailbox2 = setup_and_spawn_actor(
349 &context,
350 oracle.manager(),
351 oracle.control(scheme.public_key()),
352 scheme,
353 connections.remove(0),
354 Consumer::dummy(),
355 prod2,
356 )
357 .await;
358
359 let scheme = schemes.remove(0);
360 let _mailbox3 = setup_and_spawn_actor(
361 &context,
362 oracle.manager(),
363 oracle.control(scheme.public_key()),
364 scheme,
365 connections.remove(0),
366 Consumer::dummy(),
367 prod3,
368 )
369 .await;
370
371 mailbox1.fetch(key.clone()).await;
372
373 let event = cons_out1.next().await.unwrap();
374 match event {
375 Event::Success(key_actual, value) => {
376 assert_eq!(key_actual, key);
377 assert_eq!(value, Bytes::from("data for key 3"));
378 }
379 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
380 }
381 });
382 }
383
384 #[test_traced]
389 fn test_no_peers_available() {
390 let executor = deterministic::Runner::timed(Duration::from_secs(10));
391 executor.start(|context| async move {
392 let (oracle, mut schemes, _peers, mut connections) =
393 setup_network_and_peers(&context, &[1]).await;
394
395 let (cons1, mut cons_out1) = Consumer::new();
396 let prod1 = Producer::default();
397
398 let scheme = schemes.remove(0);
399 let mut mailbox1 = setup_and_spawn_actor(
400 &context,
401 oracle.manager(),
402 oracle.control(scheme.public_key()),
403 scheme,
404 connections.remove(0),
405 cons1,
406 prod1,
407 )
408 .await;
409
410 let key = Key(4);
411 mailbox1.fetch(key.clone()).await;
412 context.sleep(Duration::from_secs(5)).await;
413 mailbox1.cancel(key.clone()).await;
414
415 let event = cons_out1.next().await.expect("Consumer channel closed");
416 match event {
417 Event::Failed(key_actual) => {
418 assert_eq!(key_actual, key);
419 }
420 Event::Success(_, _) => {
421 panic!("Fetch should have failed due to no peers")
422 }
423 }
424 });
425 }
426
427 #[test_traced]
431 fn test_concurrent_fetch_requests() {
432 let executor = deterministic::Runner::timed(Duration::from_secs(60));
433 executor.start(|context| async move {
434 let (mut oracle, mut schemes, peers, mut connections) =
435 setup_network_and_peers(&context, &[1, 2, 3]).await;
436
437 let key2 = Key(2);
438 let key3 = Key(3);
439 let mut prod2 = Producer::default();
440 prod2.insert(key2.clone(), Bytes::from("data for key 2"));
441 let mut prod3 = Producer::default();
442 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
443
444 let (cons1, mut cons_out1) = Consumer::new();
445
446 let scheme = schemes.remove(0);
447 let mut mailbox1 = setup_and_spawn_actor(
448 &context,
449 oracle.manager(),
450 oracle.control(scheme.public_key()),
451 scheme,
452 connections.remove(0),
453 cons1,
454 Producer::default(),
455 )
456 .await;
457
458 let scheme = schemes.remove(0);
459 let _mailbox2 = setup_and_spawn_actor(
460 &context,
461 oracle.manager(),
462 oracle.control(scheme.public_key()),
463 scheme,
464 connections.remove(0),
465 Consumer::dummy(),
466 prod2,
467 )
468 .await;
469
470 let scheme = schemes.remove(0);
471 let _mailbox3 = setup_and_spawn_actor(
472 &context,
473 oracle.manager(),
474 oracle.control(scheme.public_key()),
475 scheme,
476 connections.remove(0),
477 Consumer::dummy(),
478 prod3,
479 )
480 .await;
481
482 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
484 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
485
486 for _ in 0..10 {
488 mailbox1.fetch(key2.clone()).await;
490 mailbox1.fetch(key3.clone()).await;
491
492 let mut events = Vec::new();
494 events.push(cons_out1.next().await.expect("Consumer channel closed"));
495 events.push(cons_out1.next().await.expect("Consumer channel closed"));
496
497 let mut found_key2 = false;
499 let mut found_key3 = false;
500 for event in events {
501 match event {
502 Event::Success(key_actual, value) => {
503 if key_actual == key2 {
504 assert_eq!(value, Bytes::from("data for key 2"));
505 found_key2 = true;
506 } else if key_actual == key3 {
507 assert_eq!(value, Bytes::from("data for key 3"));
508 found_key3 = true;
509 } else {
510 panic!("Unexpected key received");
511 }
512 }
513 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
514 }
515 }
516 assert!(found_key2 && found_key3,);
517 }
518 });
519 }
520
521 #[test_traced]
524 fn test_cancel() {
525 let executor = deterministic::Runner::timed(Duration::from_secs(10));
526 executor.start(|context| async move {
527 let (mut oracle, mut schemes, peers, mut connections) =
528 setup_network_and_peers(&context, &[1, 2]).await;
529
530 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
531
532 let key = Key(6);
533 let mut prod2 = Producer::default();
534 prod2.insert(key.clone(), Bytes::from("data for key 6"));
535
536 let (cons1, mut cons_out1) = Consumer::new();
537
538 let scheme = schemes.remove(0);
539 let mut mailbox1 = setup_and_spawn_actor(
540 &context,
541 oracle.manager(),
542 oracle.control(scheme.public_key()),
543 scheme,
544 connections.remove(0),
545 cons1,
546 Producer::default(),
547 )
548 .await;
549
550 let scheme = schemes.remove(0);
551 let _mailbox2 = setup_and_spawn_actor(
552 &context,
553 oracle.manager(),
554 oracle.control(scheme.public_key()),
555 scheme,
556 connections.remove(0),
557 Consumer::dummy(),
558 prod2,
559 )
560 .await;
561
562 mailbox1.cancel(key.clone()).await;
564 select! {
565 _ = cons_out1.next() => { panic!("unexpected event"); },
566 _ = context.sleep(Duration::from_millis(100)) => {},
567 };
568
569 mailbox1.fetch(key.clone()).await;
571 let event = cons_out1.next().await.unwrap();
572 match event {
573 Event::Success(key_actual, value) => {
574 assert_eq!(key_actual, key);
575 assert_eq!(value, Bytes::from("data for key 6"));
576 }
577 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
578 }
579
580 mailbox1.cancel(key.clone()).await;
582 select! {
583 _ = cons_out1.next() => { panic!("unexpected event"); },
584 _ = context.sleep(Duration::from_millis(100)) => {},
585 };
586
587 let key = Key(7);
589 mailbox1.fetch(key.clone()).await;
590 mailbox1.cancel(key.clone()).await;
591
592 let event = cons_out1.next().await.unwrap();
594 match event {
595 Event::Failed(key_actual) => {
596 assert_eq!(key_actual, key);
597 }
598 Event::Success(_, _) => panic!("Fetch should have been canceled"),
599 }
600 });
601 }
602
603 #[test_traced]
606 fn test_blocking_peer() {
607 let executor = deterministic::Runner::timed(Duration::from_secs(10));
608 executor.start(|context| async move {
609 let (mut oracle, mut schemes, peers, mut connections) =
610 setup_network_and_peers(&context, &[1, 2, 3]).await;
611
612 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
613 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
614 add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
615
616 let key_a = Key(1);
617 let key_b = Key(2);
618 let invalid_data_a = Bytes::from("invalid for A");
619 let valid_data_a = Bytes::from("valid for A");
620 let valid_data_b = Bytes::from("valid for B");
621
622 let mut prod2 = Producer::default();
624 prod2.insert(key_a.clone(), invalid_data_a.clone());
625 prod2.insert(key_b.clone(), valid_data_b.clone());
626
627 let mut prod3 = Producer::default();
628 prod3.insert(key_a.clone(), valid_data_a.clone());
629
630 let (mut cons1, mut cons_out1) = Consumer::new();
632 cons1.add_expected(key_a.clone(), valid_data_a.clone());
633 cons1.add_expected(key_b.clone(), valid_data_b.clone());
634
635 let scheme = schemes.remove(0);
637 let mut mailbox1 = setup_and_spawn_actor(
638 &context,
639 oracle.manager(),
640 oracle.control(scheme.public_key()),
641 scheme,
642 connections.remove(0),
643 cons1,
644 Producer::default(),
645 )
646 .await;
647
648 let scheme = schemes.remove(0);
649 let _mailbox2 = setup_and_spawn_actor(
650 &context,
651 oracle.manager(),
652 oracle.control(scheme.public_key()),
653 scheme,
654 connections.remove(0),
655 Consumer::dummy(),
656 prod2,
657 )
658 .await;
659
660 let scheme = schemes.remove(0);
661 let _mailbox3 = setup_and_spawn_actor(
662 &context,
663 oracle.manager(),
664 oracle.control(scheme.public_key()),
665 scheme,
666 connections.remove(0),
667 Consumer::dummy(),
668 prod3,
669 )
670 .await;
671
672 for _ in 0..20 {
674 mailbox1.fetch(key_a.clone()).await;
676
677 let event = cons_out1.next().await.unwrap();
679 match event {
680 Event::Success(key_actual, value) => {
681 assert_eq!(key_actual, key_a);
682 assert_eq!(value, valid_data_a);
683 }
684 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
685 }
686 }
687
688 mailbox1.fetch(key_b.clone()).await;
690
691 context.sleep(Duration::from_secs(5)).await;
693
694 mailbox1.cancel(key_b.clone()).await;
696
697 let event = cons_out1.next().await.unwrap();
699 match event {
700 Event::Failed(key_actual) => {
701 assert_eq!(key_actual, key_b);
702 }
703 Event::Success(_, _) => panic!("Fetch should have been canceled"),
704 }
705
706 let blocked = oracle.blocked().await.unwrap();
708 assert_eq!(blocked.len(), 1);
709 assert_eq!(blocked[0].0, peers[0]);
710 assert_eq!(blocked[0].1, peers[1]);
711 });
712 }
713
714 #[test_traced]
718 fn test_duplicate_fetch_request() {
719 let executor = deterministic::Runner::timed(Duration::from_secs(10));
720 executor.start(|context| async move {
721 let (mut oracle, mut schemes, peers, mut connections) =
722 setup_network_and_peers(&context, &[1, 2]).await;
723
724 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
725
726 let key = Key(5);
727 let mut prod2 = Producer::default();
728 prod2.insert(key.clone(), Bytes::from("data for key 5"));
729
730 let (cons1, mut cons_out1) = Consumer::new();
731
732 let scheme = schemes.remove(0);
733 let mut mailbox1 = setup_and_spawn_actor(
734 &context,
735 oracle.manager(),
736 oracle.control(scheme.public_key()),
737 scheme,
738 connections.remove(0),
739 cons1,
740 Producer::default(),
741 )
742 .await;
743
744 let scheme = schemes.remove(0);
745 let _mailbox2 = setup_and_spawn_actor(
746 &context,
747 oracle.manager(),
748 oracle.control(scheme.public_key()),
749 scheme,
750 connections.remove(0),
751 Consumer::dummy(),
752 prod2,
753 )
754 .await;
755
756 mailbox1.fetch(key.clone()).await;
758 mailbox1.fetch(key.clone()).await;
759
760 let event = cons_out1.next().await.unwrap();
762 match event {
763 Event::Success(key_actual, value) => {
764 assert_eq!(key_actual, key);
765 assert_eq!(value, Bytes::from("data for key 5"));
766 }
767 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
768 }
769
770 select! {
772 _ = cons_out1.next() => {
773 panic!("Unexpected second event received for duplicate fetch");
774 },
775 _ = context.sleep(Duration::from_millis(500)) => {
776 },
778 };
779 });
780 }
781
782 #[test_traced]
786 fn test_changing_peer_sets() {
787 let executor = deterministic::Runner::timed(Duration::from_secs(10));
788 executor.start(|context| async move {
789 let (mut oracle, mut schemes, peers, mut connections) =
790 setup_network_and_peers(&context, &[1, 2, 3]).await;
791
792 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
793 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
794
795 let key1 = Key(1);
796 let key2 = Key(2);
797
798 let mut prod2 = Producer::default();
799 prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
800
801 let mut prod3 = Producer::default();
802 prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
803
804 let (cons1, mut cons_out1) = Consumer::new();
805
806 let scheme = schemes.remove(0);
807 let mut mailbox1 = setup_and_spawn_actor(
808 &context,
809 oracle.manager(),
810 oracle.control(scheme.public_key()),
811 scheme,
812 connections.remove(0),
813 cons1,
814 Producer::default(),
815 )
816 .await;
817
818 let scheme = schemes.remove(0);
819 let _mailbox2 = setup_and_spawn_actor(
820 &context,
821 oracle.manager(),
822 oracle.control(scheme.public_key()),
823 scheme,
824 connections.remove(0),
825 Consumer::dummy(),
826 prod2,
827 )
828 .await;
829
830 mailbox1.fetch(key1.clone()).await;
832
833 let event = cons_out1.next().await.unwrap();
835 match event {
836 Event::Success(key_actual, value) => {
837 assert_eq!(key_actual, key1);
838 assert_eq!(value, Bytes::from("data from peer 2"));
839 }
840 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
841 }
842
843 let scheme = schemes.remove(0);
845 let _mailbox3 = setup_and_spawn_actor(
846 &context,
847 oracle.manager(),
848 oracle.control(scheme.public_key()),
849 scheme,
850 connections.remove(0),
851 Consumer::dummy(),
852 prod3,
853 )
854 .await;
855
856 context.sleep(Duration::from_millis(200)).await;
858
859 mailbox1.fetch(key2.clone()).await;
861
862 let event = cons_out1.next().await.unwrap();
864 match event {
865 Event::Success(key_actual, value) => {
866 assert_eq!(key_actual, key2);
867 assert_eq!(value, Bytes::from("data from peer 3"));
868 }
869 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
870 }
871 });
872 }
873
874 #[test_traced]
875 fn test_fetch_targeted() {
876 let executor = deterministic::Runner::timed(Duration::from_secs(10));
877 executor.start(|context| async move {
878 let (mut oracle, mut schemes, peers, mut connections) =
879 setup_network_and_peers(&context, &[1, 2, 3]).await;
880
881 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
882 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
883
884 let key = Key(1);
885 let invalid_data = Bytes::from("invalid data");
886 let valid_data = Bytes::from("valid data");
887
888 let mut prod2 = Producer::default();
890 prod2.insert(key.clone(), invalid_data.clone());
891
892 let mut prod3 = Producer::default();
893 prod3.insert(key.clone(), valid_data.clone());
894
895 let (mut cons1, mut cons_out1) = Consumer::new();
897 cons1.add_expected(key.clone(), valid_data.clone());
898
899 let scheme = schemes.remove(0);
900 let mut mailbox1 = setup_and_spawn_actor(
901 &context,
902 oracle.manager(),
903 oracle.control(scheme.public_key()),
904 scheme,
905 connections.remove(0),
906 cons1,
907 Producer::default(),
908 )
909 .await;
910
911 let scheme = schemes.remove(0);
912 let _mailbox2 = setup_and_spawn_actor(
913 &context,
914 oracle.manager(),
915 oracle.control(scheme.public_key()),
916 scheme,
917 connections.remove(0),
918 Consumer::dummy(),
919 prod2,
920 )
921 .await;
922
923 let scheme = schemes.remove(0);
924 let _mailbox3 = setup_and_spawn_actor(
925 &context,
926 oracle.manager(),
927 oracle.control(scheme.public_key()),
928 scheme,
929 connections.remove(0),
930 Consumer::dummy(),
931 prod3,
932 )
933 .await;
934
935 context.sleep(Duration::from_millis(100)).await;
937
938 mailbox1
942 .fetch_targeted(
943 key.clone(),
944 non_empty_vec![peers[1].clone(), peers[2].clone()],
945 )
946 .await;
947
948 let event = cons_out1.next().await.unwrap();
950 match event {
951 Event::Success(key_actual, value) => {
952 assert_eq!(key_actual, key);
953 assert_eq!(value, valid_data);
954 }
955 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
956 }
957
958 let blocked = oracle.blocked().await.unwrap();
960 assert_eq!(blocked.len(), 1);
961 assert_eq!(blocked[0].0, peers[0]);
962 assert_eq!(blocked[0].1, peers[1]);
963
964 let metrics = context.encode();
966 assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
967 });
968 }
969
970 #[test_traced]
971 fn test_fetch_targeted_no_fallback() {
972 let executor = deterministic::Runner::timed(Duration::from_secs(10));
973 executor.start(|context| async move {
974 let (mut oracle, mut schemes, peers, mut connections) =
975 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
976
977 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
978 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
979 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
980
981 let key = Key(1);
982
983 let mut prod4 = Producer::default();
985 prod4.insert(key.clone(), Bytes::from("data from peer 4"));
986
987 let (cons1, mut cons_out1) = Consumer::new();
988
989 let scheme = schemes.remove(0);
990 let mut mailbox1 = setup_and_spawn_actor(
991 &context,
992 oracle.manager(),
993 oracle.control(scheme.public_key()),
994 scheme,
995 connections.remove(0),
996 cons1,
997 Producer::default(),
998 )
999 .await;
1000
1001 let scheme = schemes.remove(0);
1002 let _mailbox2 = setup_and_spawn_actor(
1003 &context,
1004 oracle.manager(),
1005 oracle.control(scheme.public_key()),
1006 scheme,
1007 connections.remove(0),
1008 Consumer::dummy(),
1009 Producer::default(), )
1011 .await;
1012
1013 let scheme = schemes.remove(0);
1014 let _mailbox3 = setup_and_spawn_actor(
1015 &context,
1016 oracle.manager(),
1017 oracle.control(scheme.public_key()),
1018 scheme,
1019 connections.remove(0),
1020 Consumer::dummy(),
1021 Producer::default(), )
1023 .await;
1024
1025 let scheme = schemes.remove(0);
1026 let _mailbox4 = setup_and_spawn_actor(
1027 &context,
1028 oracle.manager(),
1029 oracle.control(scheme.public_key()),
1030 scheme,
1031 connections.remove(0),
1032 Consumer::dummy(),
1033 prod4,
1034 )
1035 .await;
1036
1037 context.sleep(Duration::from_millis(100)).await;
1039
1040 mailbox1
1043 .fetch_targeted(
1044 key.clone(),
1045 non_empty_vec![peers[1].clone(), peers[2].clone()],
1046 )
1047 .await;
1048
1049 select! {
1052 event = cons_out1.next() => {
1053 panic!("Fetch should not succeed, but got: {event:?}");
1054 },
1055 _ = context.sleep(Duration::from_secs(3)) => {
1056 },
1058 };
1059 });
1060 }
1061
1062 #[test_traced]
1063 fn test_fetch_all_targeted() {
1064 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1065 executor.start(|context| async move {
1066 let (mut oracle, mut schemes, peers, mut connections) =
1067 setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1068
1069 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1070 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1071 add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1072
1073 let key1 = Key(1);
1074 let key2 = Key(2);
1075 let key3 = Key(3);
1076
1077 let mut prod2 = Producer::default();
1079 prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1080
1081 let mut prod3 = Producer::default();
1083 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1084
1085 let mut prod4 = Producer::default();
1087 prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1088
1089 let (mut cons1, mut cons_out1) = Consumer::new();
1091 cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1092 cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1093 cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1094
1095 let scheme = schemes.remove(0);
1096 let mut mailbox1 = setup_and_spawn_actor(
1097 &context,
1098 oracle.manager(),
1099 oracle.control(scheme.public_key()),
1100 scheme,
1101 connections.remove(0),
1102 cons1,
1103 Producer::default(),
1104 )
1105 .await;
1106
1107 let scheme = schemes.remove(0);
1108 let _mailbox2 = setup_and_spawn_actor(
1109 &context,
1110 oracle.manager(),
1111 oracle.control(scheme.public_key()),
1112 scheme,
1113 connections.remove(0),
1114 Consumer::dummy(),
1115 prod2,
1116 )
1117 .await;
1118
1119 let scheme = schemes.remove(0);
1120 let _mailbox3 = setup_and_spawn_actor(
1121 &context,
1122 oracle.manager(),
1123 oracle.control(scheme.public_key()),
1124 scheme,
1125 connections.remove(0),
1126 Consumer::dummy(),
1127 prod3,
1128 )
1129 .await;
1130
1131 let scheme = schemes.remove(0);
1132 let _mailbox4 = setup_and_spawn_actor(
1133 &context,
1134 oracle.manager(),
1135 oracle.control(scheme.public_key()),
1136 scheme,
1137 connections.remove(0),
1138 Consumer::dummy(),
1139 prod4,
1140 )
1141 .await;
1142
1143 context.sleep(Duration::from_millis(100)).await;
1145
1146 mailbox1
1151 .fetch_all_targeted(vec![
1152 (key1.clone(), non_empty_vec![peers[1].clone()]), (key2.clone(), non_empty_vec![peers[3].clone()]), ])
1155 .await;
1156 mailbox1.fetch(key3.clone()).await; let mut results = HashMap::new();
1160 for _ in 0..3 {
1161 let event = cons_out1.next().await.unwrap();
1162 match event {
1163 Event::Success(key, value) => {
1164 results.insert(key, value);
1165 }
1166 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1167 }
1168 }
1169
1170 assert_eq!(results.len(), 3);
1172 assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1173 assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1174 assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1175
1176 let metrics = context.encode();
1178 assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1179 });
1180 }
1181
1182 #[test_traced]
1185 fn test_fetch_clears_targets() {
1186 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1187 executor.start(|context| async move {
1188 let (mut oracle, mut schemes, peers, mut connections) =
1189 setup_network_and_peers(&context, &[1, 2, 3]).await;
1190
1191 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1192 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1193
1194 let key = Key(1);
1195 let valid_data = Bytes::from("valid data");
1196
1197 let mut prod3 = Producer::default();
1199 prod3.insert(key.clone(), valid_data.clone());
1200
1201 let (cons1, mut cons_out1) = Consumer::new();
1202
1203 let scheme = schemes.remove(0);
1204 let mut mailbox1 = setup_and_spawn_actor(
1205 &context,
1206 oracle.manager(),
1207 oracle.control(scheme.public_key()),
1208 scheme,
1209 connections.remove(0),
1210 cons1,
1211 Producer::default(),
1212 )
1213 .await;
1214
1215 let scheme = schemes.remove(0);
1216 let _mailbox2 = setup_and_spawn_actor(
1217 &context,
1218 oracle.manager(),
1219 oracle.control(scheme.public_key()),
1220 scheme,
1221 connections.remove(0),
1222 Consumer::dummy(),
1223 Producer::default(), )
1225 .await;
1226
1227 let scheme = schemes.remove(0);
1228 let _mailbox3 = setup_and_spawn_actor(
1229 &context,
1230 oracle.manager(),
1231 oracle.control(scheme.public_key()),
1232 scheme,
1233 connections.remove(0),
1234 Consumer::dummy(),
1235 prod3,
1236 )
1237 .await;
1238
1239 context.sleep(Duration::from_millis(100)).await;
1241
1242 mailbox1
1244 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1245 .await;
1246
1247 context.sleep(Duration::from_millis(500)).await;
1249
1250 mailbox1.fetch(key.clone()).await;
1252
1253 let event = cons_out1.next().await.unwrap();
1255 match event {
1256 Event::Success(key_actual, value) => {
1257 assert_eq!(key_actual, key);
1258 assert_eq!(value, valid_data);
1259 }
1260 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1261 }
1262 });
1263 }
1264
1265 #[test_traced]
1266 fn test_fetch_targeted_does_not_restrict_all() {
1267 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1268 executor.start(|context| async move {
1269 let (mut oracle, mut schemes, peers, mut connections) =
1270 setup_network_and_peers(&context, &[1, 2, 3]).await;
1271
1272 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1273 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1274
1275 let key = Key(1);
1276 let valid_data = Bytes::from("valid data");
1277
1278 let mut prod3 = Producer::default();
1280 prod3.insert(key.clone(), valid_data.clone());
1281
1282 let (cons1, mut cons_out1) = Consumer::new();
1283
1284 let scheme = schemes.remove(0);
1285 let mut mailbox1 = setup_and_spawn_actor(
1286 &context,
1287 oracle.manager(),
1288 oracle.control(scheme.public_key()),
1289 scheme,
1290 connections.remove(0),
1291 cons1,
1292 Producer::default(),
1293 )
1294 .await;
1295
1296 let scheme = schemes.remove(0);
1297 let _mailbox2 = setup_and_spawn_actor(
1298 &context,
1299 oracle.manager(),
1300 oracle.control(scheme.public_key()),
1301 scheme,
1302 connections.remove(0),
1303 Consumer::dummy(),
1304 Producer::default(), )
1306 .await;
1307
1308 let scheme = schemes.remove(0);
1309 let _mailbox3 = setup_and_spawn_actor(
1310 &context,
1311 oracle.manager(),
1312 oracle.control(scheme.public_key()),
1313 scheme,
1314 connections.remove(0),
1315 Consumer::dummy(),
1316 prod3,
1317 )
1318 .await;
1319
1320 context.sleep(Duration::from_millis(100)).await;
1322
1323 mailbox1.fetch(key.clone()).await;
1325
1326 context.sleep(Duration::from_millis(50)).await;
1328
1329 mailbox1
1332 .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1333 .await;
1334
1335 let event = cons_out1.next().await.unwrap();
1338 match event {
1339 Event::Success(key_actual, value) => {
1340 assert_eq!(key_actual, key);
1341 assert_eq!(value, valid_data);
1342 }
1343 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1344 }
1345 });
1346 }
1347
1348 #[test_traced]
1349 fn test_retain() {
1350 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1351 executor.start(|context| async move {
1352 let (mut oracle, mut schemes, peers, mut connections) =
1353 setup_network_and_peers(&context, &[1, 2]).await;
1354
1355 let key = Key(5);
1356 let mut prod2 = Producer::default();
1357 prod2.insert(key.clone(), Bytes::from("data for key 5"));
1358
1359 let (cons1, mut cons_out1) = Consumer::new();
1360
1361 let scheme = schemes.remove(0);
1362 let mut mailbox1 = setup_and_spawn_actor(
1363 &context,
1364 oracle.manager(),
1365 oracle.control(scheme.public_key()),
1366 scheme,
1367 connections.remove(0),
1368 cons1,
1369 Producer::default(),
1370 )
1371 .await;
1372
1373 let scheme = schemes.remove(0);
1374 let _mailbox2 = setup_and_spawn_actor(
1375 &context,
1376 oracle.manager(),
1377 oracle.control(scheme.public_key()),
1378 scheme,
1379 connections.remove(0),
1380 Consumer::dummy(),
1381 prod2,
1382 )
1383 .await;
1384
1385 mailbox1.retain(|_| true).await;
1387 select! {
1388 _ = cons_out1.next() => { panic!("unexpected event"); },
1389 _ = context.sleep(Duration::from_millis(100)) => {},
1390 };
1391
1392 mailbox1.fetch(key.clone()).await;
1394
1395 let key_clone = key.clone();
1398 mailbox1.retain(move |k| k != &key_clone).await;
1399
1400 let event = cons_out1.next().await.unwrap();
1402 match event {
1403 Event::Failed(key_actual) => {
1404 assert_eq!(key_actual, key);
1405 }
1406 Event::Success(_, _) => panic!("Fetch should have been retained out"),
1407 }
1408
1409 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1411
1412 mailbox1.fetch(key.clone()).await;
1415
1416 let event = cons_out1.next().await.unwrap();
1418 match event {
1419 Event::Success(key_actual, value) => {
1420 assert_eq!(key_actual, key);
1421 assert_eq!(value, Bytes::from("data for key 5"));
1422 }
1423 Event::Failed(_) => unreachable!(),
1424 }
1425 });
1426 }
1427
1428 #[test_traced]
1429 fn test_clear() {
1430 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1431 executor.start(|context| async move {
1432 let (mut oracle, mut schemes, peers, mut connections) =
1433 setup_network_and_peers(&context, &[1, 2]).await;
1434
1435 let key = Key(6);
1437 let mut prod2 = Producer::default();
1438 prod2.insert(key.clone(), Bytes::from("data for key 6"));
1439
1440 let (cons1, mut cons_out1) = Consumer::new();
1441
1442 let scheme = schemes.remove(0);
1443 let mut mailbox1 = setup_and_spawn_actor(
1444 &context,
1445 oracle.manager(),
1446 oracle.control(scheme.public_key()),
1447 scheme,
1448 connections.remove(0),
1449 cons1,
1450 Producer::default(),
1451 )
1452 .await;
1453
1454 let scheme = schemes.remove(0);
1455 let _mailbox2 = setup_and_spawn_actor(
1456 &context,
1457 oracle.manager(),
1458 oracle.control(scheme.public_key()),
1459 scheme,
1460 connections.remove(0),
1461 Consumer::dummy(),
1462 prod2,
1463 )
1464 .await;
1465
1466 mailbox1.clear().await;
1468 select! {
1469 _ = cons_out1.next() => { panic!("unexpected event"); },
1470 _ = context.sleep(Duration::from_millis(100)) => {},
1471 };
1472
1473 mailbox1.fetch(key.clone()).await;
1475
1476 mailbox1.clear().await;
1478
1479 let event = cons_out1.next().await.unwrap();
1481 match event {
1482 Event::Failed(key_actual) => {
1483 assert_eq!(key_actual, key);
1484 }
1485 Event::Success(_, _) => panic!("Fetch should have been cleared"),
1486 }
1487
1488 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1490
1491 mailbox1.fetch(key.clone()).await;
1494
1495 let event = cons_out1.next().await.unwrap();
1497 match event {
1498 Event::Success(key_actual, value) => {
1499 assert_eq!(key_actual, key);
1500 assert_eq!(value, Bytes::from("data for key 6"));
1501 }
1502 Event::Failed(_) => unreachable!(),
1503 }
1504 });
1505 }
1506
1507 #[test_traced]
1511 fn test_rate_limit_spillover() {
1512 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1513 executor.start(|context| async move {
1514 let (mut oracle, mut schemes, peers, mut connections) =
1516 setup_network_and_peers_with_rate_limit(
1517 &context,
1518 &[1, 2, 3],
1519 Quota::per_second(NZU32!(1)),
1520 )
1521 .await;
1522
1523 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1525 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1526
1527 let mut prod2 = Producer::default();
1529 let mut prod3 = Producer::default();
1530 prod2.insert(Key(0), Bytes::from("data for key 0"));
1531 prod2.insert(Key(1), Bytes::from("data for key 1"));
1532 prod3.insert(Key(0), Bytes::from("data for key 0"));
1533 prod3.insert(Key(1), Bytes::from("data for key 1"));
1534
1535 let (cons1, mut cons_out1) = Consumer::new();
1536
1537 let scheme = schemes.remove(0);
1539 let mut mailbox1 = setup_and_spawn_actor(
1540 &context,
1541 oracle.manager(),
1542 oracle.control(scheme.public_key()),
1543 scheme,
1544 connections.remove(0),
1545 cons1,
1546 Producer::default(),
1547 )
1548 .await;
1549
1550 let scheme = schemes.remove(0);
1552 let _mailbox2 = setup_and_spawn_actor(
1553 &context,
1554 oracle.manager(),
1555 oracle.control(scheme.public_key()),
1556 scheme,
1557 connections.remove(0),
1558 Consumer::dummy(),
1559 prod2,
1560 )
1561 .await;
1562
1563 let scheme = schemes.remove(0);
1565 let _mailbox3 = setup_and_spawn_actor(
1566 &context,
1567 oracle.manager(),
1568 oracle.control(scheme.public_key()),
1569 scheme,
1570 connections.remove(0),
1571 Consumer::dummy(),
1572 prod3,
1573 )
1574 .await;
1575
1576 context.sleep(Duration::from_millis(100)).await;
1578 let start = context.current();
1579
1580 mailbox1.fetch(Key(0)).await;
1584 mailbox1.fetch(Key(1)).await;
1585
1586 let mut results = HashMap::new();
1588 for _ in 0..2 {
1589 let event = cons_out1.next().await.unwrap();
1590 match event {
1591 Event::Success(key, value) => {
1592 results.insert(key.clone(), value);
1593 }
1594 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1595 }
1596 }
1597
1598 assert_eq!(results.len(), 2);
1600 assert_eq!(
1601 results.get(&Key(0)).unwrap(),
1602 &Bytes::from("data for key 0")
1603 );
1604 assert_eq!(
1605 results.get(&Key(1)).unwrap(),
1606 &Bytes::from("data for key 1")
1607 );
1608
1609 let elapsed = context.current().duration_since(start).unwrap();
1612 assert!(
1613 elapsed < Duration::from_millis(500),
1614 "Expected quick completion via spill-over, but took {elapsed:?}"
1615 );
1616 });
1617 }
1618
1619 #[test_traced]
1623 fn test_rate_limit_retry_after_reset() {
1624 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1625 executor.start(|context| async move {
1626 let (mut oracle, mut schemes, peers, mut connections) =
1628 setup_network_and_peers_with_rate_limit(
1629 &context,
1630 &[1, 2],
1631 Quota::per_second(NZU32!(1)),
1632 )
1633 .await;
1634
1635 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1636
1637 let mut prod2 = Producer::default();
1639 prod2.insert(Key(1), Bytes::from("data for key 1"));
1640 prod2.insert(Key(2), Bytes::from("data for key 2"));
1641 prod2.insert(Key(3), Bytes::from("data for key 3"));
1642
1643 let (cons1, mut cons_out1) = Consumer::new();
1644
1645 let scheme = schemes.remove(0);
1646 let mut mailbox1 = setup_and_spawn_actor(
1647 &context,
1648 oracle.manager(),
1649 oracle.control(scheme.public_key()),
1650 scheme,
1651 connections.remove(0),
1652 cons1,
1653 Producer::default(),
1654 )
1655 .await;
1656
1657 let scheme = schemes.remove(0);
1658 let _mailbox2 = setup_and_spawn_actor(
1659 &context,
1660 oracle.manager(),
1661 oracle.control(scheme.public_key()),
1662 scheme,
1663 connections.remove(0),
1664 Consumer::dummy(),
1665 prod2,
1666 )
1667 .await;
1668
1669 context.sleep(Duration::from_millis(100)).await;
1671 let start = context.current();
1672
1673 mailbox1.fetch(Key(1)).await;
1676 mailbox1.fetch(Key(2)).await;
1677 mailbox1.fetch(Key(3)).await;
1678
1679 let mut results = HashMap::new();
1681 for _ in 0..3 {
1682 let event = cons_out1.next().await.unwrap();
1683 match event {
1684 Event::Success(key, value) => {
1685 results.insert(key.clone(), value);
1686 }
1687 Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1688 }
1689 }
1690
1691 assert_eq!(results.len(), 3);
1692 for i in 1..=3 {
1693 assert_eq!(
1694 results.get(&Key(i)).unwrap(),
1695 &Bytes::from(format!("data for key {}", i))
1696 );
1697 }
1698
1699 let elapsed = context.current().duration_since(start).unwrap();
1703 assert!(
1704 elapsed > Duration::from_secs(2),
1705 "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1706 );
1707 });
1708 }
1709
1710 #[test_traced]
1714 fn test_self_exclusion() {
1715 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1716 executor.start(|context| async move {
1717 let (mut oracle, mut schemes, peers, mut connections) =
1718 setup_network_and_peers(&context, &[1, 2]).await;
1719
1720 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1721
1722 let key = Key(1);
1723 let data = Bytes::from("shared data");
1724
1725 let mut prod1 = Producer::default();
1727 prod1.insert(key.clone(), data.clone());
1728 let mut prod2 = Producer::default();
1729 prod2.insert(key.clone(), data.clone());
1730
1731 let (cons1, mut cons_out1) = Consumer::new();
1732
1733 let scheme = schemes.remove(0);
1735 let mut mailbox1 = setup_and_spawn_actor(
1736 &context,
1737 oracle.manager(),
1738 oracle.control(scheme.public_key()),
1739 scheme,
1740 connections.remove(0),
1741 cons1,
1742 prod1, )
1744 .await;
1745
1746 let scheme = schemes.remove(0);
1748 let _mailbox2 = setup_and_spawn_actor(
1749 &context,
1750 oracle.manager(),
1751 oracle.control(scheme.public_key()),
1752 scheme,
1753 connections.remove(0),
1754 Consumer::dummy(),
1755 prod2,
1756 )
1757 .await;
1758
1759 context.sleep(Duration::from_millis(100)).await;
1761
1762 mailbox1.fetch(key.clone()).await;
1764
1765 let event = cons_out1.next().await.unwrap();
1767 match event {
1768 Event::Success(key_actual, value) => {
1769 assert_eq!(key_actual, key);
1770 assert_eq!(value, data);
1771 }
1772 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1773 }
1774 });
1775 }
1776
1777 #[allow(clippy::type_complexity)]
1778 fn spawn_actors_with_handles(
1779 context: deterministic::Context,
1780 oracle: &Oracle<PublicKey, deterministic::Context>,
1781 schemes: Vec<PrivateKey>,
1782 connections: Vec<(
1783 Sender<PublicKey, deterministic::Context>,
1784 Receiver<PublicKey>,
1785 )>,
1786 consumers: Vec<Consumer<Key, Bytes>>,
1787 producers: Vec<Producer<Key, Bytes>>,
1788 ) -> (
1789 Vec<Mailbox<Key, PublicKey>>,
1790 Vec<commonware_runtime::Handle<()>>,
1791 ) {
1792 let actor_context = context.with_label("actor");
1793 let mut mailboxes = Vec::new();
1794 let mut handles = Vec::new();
1795
1796 for (idx, ((scheme, conn), (consumer, producer))) in schemes
1797 .into_iter()
1798 .zip(connections)
1799 .zip(consumers.into_iter().zip(producers))
1800 .enumerate()
1801 {
1802 let ctx = actor_context.with_label(&format!("peer_{idx}"));
1803 let public_key = scheme.public_key();
1804 let (engine, mailbox) = Engine::new(
1805 ctx,
1806 Config {
1807 manager: oracle.manager(),
1808 blocker: oracle.control(public_key.clone()),
1809 consumer,
1810 producer,
1811 mailbox_size: MAILBOX_SIZE,
1812 me: Some(public_key),
1813 initial: INITIAL_DURATION,
1814 timeout: TIMEOUT,
1815 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
1816 priority_requests: false,
1817 priority_responses: false,
1818 },
1819 );
1820 handles.push(engine.start(conn));
1821 mailboxes.push(mailbox);
1822 }
1823
1824 (mailboxes, handles)
1825 }
1826
1827 #[test_traced]
1828 fn test_operations_after_shutdown_do_not_panic() {
1829 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1830 executor.start(|context| async move {
1831 let (mut oracle, schemes, peers, connections) =
1832 setup_network_and_peers(&context, &[1, 2]).await;
1833
1834 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1835
1836 let key = Key(1);
1837 let mut prod2 = Producer::default();
1838 prod2.insert(key.clone(), Bytes::from("data for key 1"));
1839
1840 let (cons1, mut cons_out1) = Consumer::new();
1841
1842 let (mut mailboxes, handles) = spawn_actors_with_handles(
1843 context.clone(),
1844 &oracle,
1845 schemes,
1846 connections,
1847 vec![cons1, Consumer::dummy()],
1848 vec![Producer::default(), prod2],
1849 );
1850
1851 mailboxes[0].fetch(key.clone()).await;
1853 let event = cons_out1.next().await.unwrap();
1854 match event {
1855 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1856 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1857 }
1858
1859 for handle in handles {
1861 handle.abort();
1862 }
1863 context.sleep(Duration::from_millis(100)).await;
1864
1865 let key2 = Key(2);
1869 mailboxes[0].fetch(key2.clone()).await;
1870
1871 mailboxes[0].cancel(key2.clone()).await;
1873
1874 mailboxes[0].clear().await;
1876
1877 mailboxes[0].retain(|_| true).await;
1879
1880 mailboxes[0]
1882 .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
1883 .await;
1884 });
1885 }
1886
1887 fn clean_shutdown(seed: u64) {
1888 let cfg = deterministic::Config::default()
1889 .with_seed(seed)
1890 .with_timeout(Some(Duration::from_secs(30)));
1891 let executor = deterministic::Runner::new(cfg);
1892 executor.start(|context| async move {
1893 let (mut oracle, schemes, peers, connections) =
1894 setup_network_and_peers(&context, &[1, 2]).await;
1895
1896 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1897
1898 let key = Key(1);
1899 let mut prod2 = Producer::default();
1900 prod2.insert(key.clone(), Bytes::from("data for key 1"));
1901
1902 let (cons1, mut cons_out1) = Consumer::new();
1903
1904 let (mut mailboxes, handles) = spawn_actors_with_handles(
1905 context.clone(),
1906 &oracle,
1907 schemes,
1908 connections,
1909 vec![cons1, Consumer::dummy()],
1910 vec![Producer::default(), prod2],
1911 );
1912
1913 context.sleep(Duration::from_millis(100)).await;
1915
1916 let running_before = count_running_tasks(&context, "actor");
1918 assert!(
1919 running_before > 0,
1920 "at least one actor task should be running"
1921 );
1922
1923 mailboxes[0].fetch(key.clone()).await;
1925 let event = cons_out1.next().await.unwrap();
1926 match event {
1927 Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1928 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1929 }
1930
1931 for handle in handles {
1933 handle.abort();
1934 }
1935 context.sleep(Duration::from_millis(100)).await;
1936
1937 let running_after = count_running_tasks(&context, "actor");
1939 assert_eq!(
1940 running_after, 0,
1941 "all actor tasks should be stopped, but {running_after} still running"
1942 );
1943 });
1944 }
1945
1946 #[test]
1947 fn test_clean_shutdown() {
1948 for seed in 0..25 {
1949 clean_shutdown(seed);
1950 }
1951 }
1952}